diff --git a/Makefile b/Makefile index a7eeb7cb5..cac3eae32 100644 --- a/Makefile +++ b/Makefile @@ -789,7 +789,9 @@ endif # PLATFORM_SHARED_EXT blackbox_crash_test_with_atomic_flush whitebox_crash_test_with_atomic_flush \ blackbox_crash_test_with_txn whitebox_crash_test_with_txn \ blackbox_crash_test_with_best_efforts_recovery \ - blackbox_crash_test_with_ts whitebox_crash_test_with_ts + blackbox_crash_test_with_ts whitebox_crash_test_with_ts \ + blackbox_crash_test_with_multiops_wc_txn \ + blackbox_crash_test_with_multiops_wp_txn all: $(LIBRARY) $(BENCHMARKS) tools tools_lib test_libs $(TESTS) diff --git a/crash_test.mk b/crash_test.mk index 26d260977..d1f443e57 100644 --- a/crash_test.mk +++ b/crash_test.mk @@ -16,7 +16,9 @@ CRASHTEST_PY=$(PYTHON) -u tools/db_crashtest.py --stress_cmd=$(DB_STRESS_CMD) blackbox_crash_test_with_txn blackbox_crash_test_with_ts \ blackbox_crash_test_with_best_efforts_recovery \ whitebox_crash_test whitebox_crash_test_with_atomic_flush \ - whitebox_crash_test_with_txn whitebox_crash_test_with_ts + whitebox_crash_test_with_txn whitebox_crash_test_with_ts \ + blackbox_crash_test_with_multiops_wc_txn \ + blackbox_crash_test_with_multiops_wp_txn crash_test: $(DB_STRESS_CMD) # Do not parallelize @@ -56,6 +58,12 @@ blackbox_crash_test_with_best_efforts_recovery: $(DB_STRESS_CMD) blackbox_crash_test_with_ts: $(DB_STRESS_CMD) $(CRASHTEST_PY) --enable_ts blackbox $(CRASH_TEST_EXT_ARGS) +blackbox_crash_test_with_multiops_wc_txn: $(DB_STRESS_CMD) + $(PYTHON) -u tools/db_crashtest.py --test_multiops_txn --write_policy write_committed blackbox $(CRASH_TEST_EXT_ARGS) + +blackbox_crash_test_with_multiops_wp_txn: $(DB_STRESS_CMD) + $(PYTHON) -u tools/db_crashtest.py --test_multiops_txn --write_policy write_prepared blackbox $(CRASH_TEST_EXT_ARGS) + ifeq ($(CRASH_TEST_KILL_ODD),) CRASH_TEST_KILL_ODD=888887 endif diff --git a/db/write_batch.cc b/db/write_batch.cc index 31e2db27f..f3b800e73 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -2307,6 +2307,7 @@ class MemTableInserter : public WriteBatch::Handler { assert(db_); if (recovering_log_number_ != 0) { + db_->mutex()->AssertHeld(); // during recovery we rebuild a hollow transaction // from all encountered prepare sections of the wal if (db_->allow_2pc() == false) { @@ -2337,6 +2338,7 @@ class MemTableInserter : public WriteBatch::Handler { assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0)); if (recovering_log_number_ != 0) { + db_->mutex()->AssertHeld(); assert(db_->allow_2pc()); size_t batch_cnt = write_after_commit_ @@ -2357,6 +2359,9 @@ class MemTableInserter : public WriteBatch::Handler { } Status MarkNoop(bool empty_batch) override { + if (recovering_log_number_ != 0) { + db_->mutex()->AssertHeld(); + } // A hack in pessimistic transaction could result into a noop at the start // of the write batch, that should be ignored. if (!empty_batch) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 137339f6c..109b0d3b3 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -115,6 +115,7 @@ DECLARE_int32(level0_stop_writes_trigger); DECLARE_int32(block_size); DECLARE_int32(format_version); DECLARE_int32(index_block_restart_interval); +DECLARE_bool(disable_auto_compactions); DECLARE_int32(max_background_compactions); DECLARE_int32(num_bottom_pri_threads); DECLARE_int32(compaction_thread_pool_adjust_interval); @@ -274,6 +275,13 @@ DECLARE_int32(secondary_cache_fault_one_in); DECLARE_int32(prepopulate_block_cache); +DECLARE_bool(two_write_queues); +#ifndef ROCKSDB_LITE +DECLARE_bool(use_only_the_last_commit_time_batch_for_recovery); +DECLARE_uint64(wp_snapshot_cache_bits); +DECLARE_uint64(wp_commit_cache_bits); +#endif // !ROCKSDB_LITE + constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; constexpr int kValueMaxLen = 100; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index c973eeb55..205cff68c 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -225,6 +225,10 @@ DEFINE_int32( "Number of keys between restart points " "for delta encoding of keys in index block."); +DEFINE_bool(disable_auto_compactions, + ROCKSDB_NAMESPACE::Options().disable_auto_compactions, + "If true, RocksDB internally will not trigger compactions."); + DEFINE_int32(max_background_compactions, ROCKSDB_NAMESPACE::Options().max_background_compactions, "The maximum number of concurrent background compactions " @@ -890,4 +894,22 @@ DEFINE_int32(prepopulate_block_cache, "Options related to cache warming (see `enum " "PrepopulateBlockCache` in table.h)"); +DEFINE_bool(two_write_queues, false, + "Set to true to enable two write queues. Default: false"); +#ifndef ROCKSDB_LITE + +DEFINE_bool(use_only_the_last_commit_time_batch_for_recovery, false, + "If true, the commit-time write batch will not be immediately " + "inserted into the memtables. Default: false"); + +DEFINE_uint64( + wp_snapshot_cache_bits, 7ull, + "Number of bits to represent write-prepared transaction db's snapshot " + "cache. Default: 7 (128 entries)"); + +DEFINE_uint64(wp_commit_cache_bits, 23ull, + "Number of bits to represent write-prepared transaction db's " + "commit cache. Default: 23 (8M entries)"); +#endif // !ROCKSDB_LITE + #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index c2c793b9f..ed71b1752 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -604,8 +604,11 @@ Status StressTest::NewTxn(WriteOptions& write_opts, Transaction** txn) { if (!FLAGS_use_txn) { return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); } + write_opts.disableWAL = FLAGS_disable_wal; static std::atomic txn_id = {0}; TransactionOptions txn_options; + txn_options.use_only_the_last_commit_time_batch_for_recovery = + FLAGS_use_only_the_last_commit_time_batch_for_recovery; txn_options.lock_timeout = 600000; // 10 min txn_options.deadlock_detect = true; *txn = txn_db_->BeginTransaction(write_opts, txn_options); @@ -2153,6 +2156,28 @@ void StressTest::PrintEnv() const { fprintf(stdout, "Format version : %d\n", FLAGS_format_version); fprintf(stdout, "TransactionDB : %s\n", FLAGS_use_txn ? "true" : "false"); + + if (FLAGS_use_txn) { +#ifndef ROCKSDB_LITE + fprintf(stdout, "Two write queues: : %s\n", + FLAGS_two_write_queues ? "true" : "false"); + fprintf(stdout, "Write policy : %d\n", + static_cast(FLAGS_txn_write_policy)); + if (static_cast(TxnDBWritePolicy::WRITE_PREPARED) == + FLAGS_txn_write_policy || + static_cast(TxnDBWritePolicy::WRITE_UNPREPARED) == + FLAGS_txn_write_policy) { + fprintf(stdout, "Snapshot cache bits : %d\n", + static_cast(FLAGS_wp_snapshot_cache_bits)); + fprintf(stdout, "Commit cache bits : %d\n", + static_cast(FLAGS_wp_commit_cache_bits)); + } + fprintf(stdout, "last cwb for recovery : %s\n", + FLAGS_use_only_the_last_commit_time_batch_for_recovery ? "true" + : "false"); +#endif // !ROCKSDB_LITE + } + #ifndef ROCKSDB_LITE fprintf(stdout, "Stacked BlobDB : %s\n", FLAGS_use_blob_db ? "true" : "false"); @@ -2316,6 +2341,7 @@ void StressTest::Open() { options_.memtable_prefix_bloom_size_ratio = FLAGS_memtable_prefix_bloom_size_ratio; options_.memtable_whole_key_filtering = FLAGS_memtable_whole_key_filtering; + options_.disable_auto_compactions = FLAGS_disable_auto_compactions; options_.max_background_compactions = FLAGS_max_background_compactions; options_.max_background_flushes = FLAGS_max_background_flushes; options_.compaction_style = @@ -2704,7 +2730,13 @@ void StressTest::Open() { options_.unordered_write = true; options_.two_write_queues = true; txn_db_options.skip_concurrency_control = true; + } else { + options_.two_write_queues = FLAGS_two_write_queues; } + txn_db_options.wp_snapshot_cache_bits = + static_cast(FLAGS_wp_snapshot_cache_bits); + txn_db_options.wp_commit_cache_bits = + static_cast(FLAGS_wp_commit_cache_bits); s = TransactionDB::Open(options_, txn_db_options, FLAGS_db, cf_descriptors, &column_families_, &txn_db_); if (!s.ok()) { @@ -2762,7 +2794,11 @@ void StressTest::Open() { exit(1); #endif } - if (s.ok() && FLAGS_continuous_verification_interval > 0 && !cmp_db_) { + // Secondary instance does not support write-prepared/write-unprepared + // transactions, thus just disable secondary instance if we use + // transaction. + if (s.ok() && FLAGS_continuous_verification_interval > 0 && + !FLAGS_use_txn && !cmp_db_) { Options tmp_opts; // TODO(yanqin) support max_open_files != -1 for secondary instance. tmp_opts.max_open_files = -1; diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index e51f17a15..985af56ac 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -18,12 +18,117 @@ namespace ROCKSDB_NAMESPACE { -// TODO: move these to gflags. -static constexpr uint32_t kInitNumC = 1000; -#ifndef ROCKSDB_LITE -static constexpr uint32_t kInitialCARatio = 3; -#endif // ROCKSDB_LITE -static constexpr bool kDoPreload = true; +// The description of A and C can be found in multi_ops_txns_stress.h +DEFINE_int32(lb_a, 0, "(Inclusive) lower bound of A"); +DEFINE_int32(ub_a, 1000, "(Exclusive) upper bound of A"); +DEFINE_int32(lb_c, 0, "(Inclusive) lower bound of C"); +DEFINE_int32(ub_c, 1000, "(Exclusive) upper bound of C"); + +DEFINE_string(key_spaces_path, "", + "Path to file describing the lower and upper bounds of A and C"); + +DEFINE_int32(delay_snapshot_read_one_in, 0, + "With a chance of 1/N, inject a random delay between taking " + "snapshot and read."); + +// MultiOpsTxnsStressTest can either operate on a database with pre-populated +// data (possibly from previous ones), or create a new db and preload it with +// data specified via `-lb_a`, `-ub_a`, `-lb_c`, `-ub_c`, etc. Among these, we +// define the test key spaces as two key ranges: [lb_a, ub_a) and [lb_c, ub_c). +// The key spaces specification is persisted in a file whose absolute path can +// be specified via `-key_spaces_path`. +// +// Whether an existing db is used or a new one is created, key_spaces_path will +// be used. In the former case, the test reads the key spaces specification +// from `-key_spaces_path` and decodes [lb_a, ub_a) and [lb_c, ub_c). In the +// latter case, the test writes a key spaces specification to a file at the +// location, and this file will be used by future runs until a new db is +// created. +// +// Create a fresh new database (-destroy_db_initially=1 or there is no database +// in the location specified by -db). See PreloadDb(). +// +// Use an existing, non-empty database. See ScanExistingDb(). +// +// This test is multi-threaded, and thread count can be specified via +// `-threads`. For simplicity, we partition the key ranges and each thread +// operates on a subrange independently. +// Within each subrange, a KeyGenerator object is responsible for key +// generation. A KeyGenerator maintains two sets: set of existing keys within +// [low, high), set of non-existing keys within [low, high). [low, high) is the +// subrange. The test initialization makes sure there is at least one +// non-existing key, otherwise the test will return an error and exit before +// any test thread is spawned. + +void MultiOpsTxnsStressTest::KeyGenerator::FinishInit() { + assert(existing_.empty()); + assert(!existing_uniq_.empty()); + assert(low_ < high_); + for (auto v : existing_uniq_) { + assert(low_ <= v); + assert(high_ > v); + existing_.push_back(v); + } + if (non_existing_uniq_.empty()) { + fprintf( + stderr, + "Cannot allocate key in [%u, %u)\nStart with a new DB or try change " + "the number of threads for testing via -threads=<#threads>\n", + static_cast(low_), static_cast(high_)); + fflush(stderr); + std::terminate(); + } + initialized_ = true; +} + +std::pair +MultiOpsTxnsStressTest::KeyGenerator::ChooseExisting() { + assert(initialized_); + const size_t N = existing_.size(); + assert(N > 0); + uint32_t rnd = rand_.Uniform(static_cast(N)); + assert(rnd < N); + return std::make_pair(existing_[rnd], rnd); +} + +uint32_t MultiOpsTxnsStressTest::KeyGenerator::Allocate() { + assert(initialized_); + auto it = non_existing_uniq_.begin(); + assert(non_existing_uniq_.end() != it); + uint32_t ret = *it; + // Remove this element from non_existing_. + // Need to call UndoAllocation() if the calling transaction does not commit. + non_existing_uniq_.erase(it); + return ret; +} + +void MultiOpsTxnsStressTest::KeyGenerator::Replace(uint32_t old_val, + uint32_t old_pos, + uint32_t new_val) { + assert(initialized_); + { + auto it = existing_uniq_.find(old_val); + assert(it != existing_uniq_.end()); + existing_uniq_.erase(it); + } + + { + assert(0 == existing_uniq_.count(new_val)); + existing_uniq_.insert(new_val); + existing_[old_pos] = new_val; + } + + { + assert(0 == non_existing_uniq_.count(old_val)); + non_existing_uniq_.insert(old_val); + } +} + +void MultiOpsTxnsStressTest::KeyGenerator::UndoAllocation(uint32_t new_val) { + assert(initialized_); + assert(0 == non_existing_uniq_.count(new_val)); + non_existing_uniq_.insert(new_val); +} std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) { char buf[8]; @@ -223,56 +328,42 @@ void MultiOpsTxnsStressTest::FinishInitDb(SharedState* shared) { if (FLAGS_enable_compaction_filter) { // TODO (yanqin) enable compaction filter } - if (kDoPreload) { - ReopenAndPreloadDb(shared); + ReopenAndPreloadDbIfNeeded(shared); + // TODO (yanqin) parallelize if key space is large + for (auto& key_gen : key_gen_for_a_) { + assert(key_gen); + key_gen->FinishInit(); + } + // TODO (yanqin) parallelize if key space is large + for (auto& key_gen : key_gen_for_c_) { + assert(key_gen); + key_gen->FinishInit(); } } -void MultiOpsTxnsStressTest::ReopenAndPreloadDb(SharedState* shared) { +void MultiOpsTxnsStressTest::ReopenAndPreloadDbIfNeeded(SharedState* shared) { (void)shared; #ifndef ROCKSDB_LITE - std::vector cf_descs; - for (const auto* handle : column_families_) { - cf_descs.emplace_back(handle->GetName(), ColumnFamilyOptions(options_)); - } - CancelAllBackgroundWork(db_, /*wait=*/true); - for (auto* handle : column_families_) { - delete handle; - } - column_families_.clear(); - delete db_; - db_ = nullptr; - txn_db_ = nullptr; - - TransactionDBOptions txn_db_opts; - txn_db_opts.skip_concurrency_control = true; // speed-up preloading - Status s = TransactionDB::Open(options_, txn_db_opts, FLAGS_db, cf_descs, - &column_families_, &txn_db_); - if (s.ok()) { - db_ = txn_db_; - } else { - fprintf(stderr, "Failed to open db: %s\n", s.ToString().c_str()); - exit(1); + bool db_empty = false; + { + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + iter->SeekToFirst(); + if (!iter->Valid()) { + db_empty = true; + } } - PreloadDb(shared, kInitNumC); - - // Reopen - CancelAllBackgroundWork(db_, /*wait=*/true); - for (auto* handle : column_families_) { - delete handle; - } - column_families_.clear(); - s = db_->Close(); - if (!s.ok()) { - fprintf(stderr, "Error during closing db: %s\n", s.ToString().c_str()); - exit(1); + if (db_empty) { + PreloadDb(shared, FLAGS_threads, FLAGS_lb_a, FLAGS_ub_a, FLAGS_lb_c, + FLAGS_ub_c); + } else { + fprintf(stdout, + "Key ranges will be read from %s.\n-lb_a, -ub_a, -lb_c, -ub_c will " + "be ignored\n", + FLAGS_key_spaces_path.c_str()); + fflush(stdout); + ScanExistingDb(shared, FLAGS_threads); } - delete db_; - db_ = nullptr; - txn_db_ = nullptr; - - Open(); #endif // !ROCKSDB_LITE } @@ -281,7 +372,9 @@ Status MultiOpsTxnsStressTest::TestGet( ThreadState* thread, const ReadOptions& read_opts, const std::vector& /*rand_column_families*/, const std::vector& /*rand_keys*/) { - uint32_t a = ChooseA(thread); + uint32_t a = 0; + uint32_t pos = 0; + std::tie(a, pos) = ChooseExistingA(thread); return PointLookupTxn(thread, read_opts, a); } @@ -310,7 +403,9 @@ Status MultiOpsTxnsStressTest::TestIterate( ThreadState* thread, const ReadOptions& read_opts, const std::vector& /*rand_column_families*/, const std::vector& /*rand_keys*/) { - uint32_t c = thread->rand.Next() % kInitNumC; + uint32_t c = 0; + uint32_t pos = 0; + std::tie(c, pos) = ChooseExistingC(thread); return RangeScanTxn(thread, read_opts, c); } @@ -403,27 +498,23 @@ Status MultiOpsTxnsStressTest::TestCustomOperations( Status s; if (0 == rand) { // Update primary key. - uint32_t old_a = ChooseA(thread); - uint32_t new_a = GenerateNextA(); - s = PrimaryKeyUpdateTxn(thread, old_a, new_a); + uint32_t old_a = 0; + uint32_t pos = 0; + std::tie(old_a, pos) = ChooseExistingA(thread); + uint32_t new_a = GenerateNextA(thread); + s = PrimaryKeyUpdateTxn(thread, old_a, pos, new_a); } else if (1 == rand) { // Update secondary key. - uint32_t old_c = thread->rand.Next() % kInitNumC; - int count = 0; - uint32_t new_c = 0; - do { - ++count; - new_c = thread->rand.Next() % kInitNumC; - } while (count < 100 && new_c == old_c); - if (count >= 100) { - // If we reach here, it means our random number generator has a serious - // problem, or kInitNumC is chosen poorly. - std::terminate(); - } - s = SecondaryKeyUpdateTxn(thread, old_c, new_c); + uint32_t old_c = 0; + uint32_t pos = 0; + std::tie(old_c, pos) = ChooseExistingC(thread); + uint32_t new_c = GenerateNextC(thread); + s = SecondaryKeyUpdateTxn(thread, old_c, pos, new_c); } else if (2 == rand) { // Update primary index value. - uint32_t a = ChooseA(thread); + uint32_t a = 0; + uint32_t pos = 0; + std::tie(a, pos) = ChooseExistingA(thread); s = UpdatePrimaryIndexValueTxn(thread, a, /*b_delta=*/1); } else { // Should never reach here. @@ -434,10 +525,12 @@ Status MultiOpsTxnsStressTest::TestCustomOperations( Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, + uint32_t old_a_pos, uint32_t new_a) { #ifdef ROCKSDB_LITE (void)thread; (void)old_a; + (void)old_a_pos; (void)new_a; return Status::NotSupported(); #else @@ -455,7 +548,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, assert(txn); txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr); - const Defer cleanup([&s, thread, txn, this]() { + const Defer cleanup([new_a, &s, thread, txn, this]() { if (s.ok()) { // Two gets, one for existing pk, one for locking potential new pk. thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1); @@ -473,6 +566,8 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, } else { thread->stats.AddErrors(1); } + auto& key_gen = key_gen_for_a_[thread->tid]; + key_gen->UndoAllocation(new_a); RollbackTxn(txn).PermitUncheckedError(); }); @@ -490,6 +585,8 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, assert(!empty_value.empty()); s = Status::Busy(); return s; + } else if (!s.IsNotFound()) { + return s; } auto result = Record::DecodePrimaryIndexValue(value); @@ -528,17 +625,31 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, return s; } - s = CommitTxn(txn); + s = txn->Prepare(); + + if (!s.ok()) { + return s; + } + + s = txn->Commit(); + + auto& key_gen = key_gen_for_a_.at(thread->tid); + if (s.ok()) { + delete txn; + key_gen->Replace(old_a, old_a_pos, new_a); + } return s; #endif // !ROCKSDB_LITE } Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c, + uint32_t old_c_pos, uint32_t new_c) { #ifdef ROCKSDB_LITE (void)thread; (void)old_c; + (void)old_c_pos; (void)new_c; return Status::NotSupported(); #else @@ -555,7 +666,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, Iterator* it = nullptr; long iterations = 0; - const Defer cleanup([&s, thread, &it, txn, this, &iterations]() { + const Defer cleanup([new_c, &s, thread, &it, txn, this, &iterations]() { delete it; if (s.ok()) { thread->stats.AddIterations(iterations); @@ -577,6 +688,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, } else { thread->stats.AddErrors(1); } + auto& key_gen = key_gen_for_c_[thread->tid]; + key_gen->UndoAllocation(new_c); RollbackTxn(txn).PermitUncheckedError(); }); @@ -593,9 +706,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, std::string iter_ub_str = Record::EncodeSecondaryKey(old_c + 1); Slice iter_ub = iter_ub_str; ReadOptions ropts; - if (thread->rand.OneIn(2)) { - ropts.snapshot = txn->GetSnapshot(); - } + ropts.snapshot = txn->GetSnapshot(); ropts.total_order_seek = true; ropts.iterate_upper_bound = &iter_ub; ropts.rate_limiter_priority = @@ -616,7 +727,9 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, Record record; s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); if (!s.ok()) { - VerificationAbort(thread->shared, "Cannot decode secondary key", s); + fprintf(stderr, "Cannot decode secondary key: %s\n", + s.ToString().c_str()); + assert(false); break; } // At this point, record.b is not known yet, thus we need to access @@ -633,15 +746,25 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, // Write conflict, or cannot acquire lock, or memtable size is not large // enough, or merge cannot be resolved. break; - } else if (!s.ok()) { + } else if (s.IsNotFound()) { // We can also fail verification here. - VerificationAbort(thread->shared, "pk should exist, but does not", s); + std::ostringstream oss; + oss << "pk should exist: " << Slice(pk).ToString(true); + fprintf(stderr, "%s\n", oss.str().c_str()); + assert(false); + break; + } + if (!s.ok()) { + fprintf(stderr, "%s\n", s.ToString().c_str()); + assert(false); break; } auto result = Record::DecodePrimaryIndexValue(value); s = std::get<0>(result); if (!s.ok()) { - VerificationAbort(thread->shared, "Cannot decode primary index value", s); + fprintf(stderr, "Cannot decode primary index value: %s\n", + s.ToString().c_str()); + assert(false); break; } uint32_t b = std::get<1>(result); @@ -651,7 +774,8 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, oss << "c in primary index does not match secondary index: " << c << " != " << old_c; s = Status::Corruption(); - VerificationAbort(thread->shared, oss.str(), s); + fprintf(stderr, "%s\n", oss.str().c_str()); + assert(false); break; } Record new_rec(record.a_value(), b, new_c); @@ -681,7 +805,19 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, return s; } - s = CommitTxn(txn); + s = txn->Prepare(); + + if (!s.ok()) { + return s; + } + + s = txn->Commit(); + + if (s.ok()) { + delete txn; + auto& key_gen = key_gen_for_c_.at(thread->tid); + key_gen->Replace(old_c, old_c_pos, new_c); + } return s; #endif // !ROCKSDB_LITE @@ -737,6 +873,10 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, } auto result = Record::DecodePrimaryIndexValue(value); if (!std::get<0>(result).ok()) { + s = std::get<0>(result); + fprintf(stderr, "Cannot decode primary index value: %s\n", + s.ToString().c_str()); + assert(false); return s; } uint32_t b = std::get<1>(result) + b_delta; @@ -748,7 +888,14 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, if (!s.ok()) { return s; } - s = CommitTxn(txn); + s = txn->Prepare(); + if (!s.ok()) { + return s; + } + s = txn->Commit(); + if (s.ok()) { + delete txn; + } return s; #endif // !ROCKSDB_LITE } @@ -788,10 +935,23 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, RollbackTxn(txn).PermitUncheckedError(); }); + txn->SetSnapshot(); + ropts.snapshot = txn->GetSnapshot(); + + if (FLAGS_delay_snapshot_read_one_in > 0 && + thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { + uint64_t delay_ms = thread->rand.Uniform(100) + 1; + db_->GetDBOptions().env->SleepForMicroseconds( + static_cast(delay_ms * 1000)); + } + s = txn->Get(ropts, db_->DefaultColumnFamily(), pk_str, &value); if (s.ok()) { s = txn->Commit(); } + if (s.ok()) { + delete txn; + } return s; #endif // !ROCKSDB_LITE } @@ -825,14 +985,36 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, thread->stats.AddErrors(1); RollbackTxn(txn).PermitUncheckedError(); }); + + txn->SetSnapshot(); + ropts.snapshot = txn->GetSnapshot(); + + if (FLAGS_delay_snapshot_read_one_in > 0 && + thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { + uint64_t delay_ms = thread->rand.Uniform(100) + 1; + db_->GetDBOptions().env->SleepForMicroseconds( + static_cast(delay_ms * 1000)); + } + std::unique_ptr iter(txn->GetIterator(ropts)); - iter->Seek(sk); + + constexpr size_t total_nexts = 10; + size_t nexts = 0; + for (iter->Seek(sk); + iter->Valid() && nexts < total_nexts && iter->status().ok(); + iter->Next(), ++nexts) { + } + if (iter->status().ok()) { s = txn->Commit(); } else { s = iter->status(); } - // TODO (yanqin) more Seek/SeekForPrev/Next/Prev/SeekToFirst/SeekToLast + + if (s.ok()) { + delete txn; + } + return s; #endif // !ROCKSDB_LITE } @@ -845,6 +1027,21 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { assert(snapshot); ManagedSnapshot snapshot_guard(db_, snapshot); + std::ostringstream oss; + oss << "[snap=" << snapshot->GetSequenceNumber() << ","; + + auto* dbimpl = static_cast_with_check(db_->GetRootDB()); + assert(dbimpl); + + oss << " last_published=" << dbimpl->GetLastPublishedSequence() << "] "; + + if (FLAGS_delay_snapshot_read_one_in > 0 && + thread->rand.OneIn(FLAGS_delay_snapshot_read_one_in)) { + uint64_t delay_ms = thread->rand.Uniform(100) + 1; + db_->GetDBOptions().env->SleepForMicroseconds( + static_cast(delay_ms * 1000)); + } + // TODO (yanqin) with a probability, we can use either forward or backward // iterator in subsequent checks. We can also use more advanced features in // range scan. For now, let's just use simple forward iteration with @@ -868,7 +1065,36 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { std::unique_ptr it(db_->NewIterator(ropts)); for (it->SeekToFirst(); it->Valid(); it->Next()) { + Record record; + Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); + if (!s.ok()) { + oss << "Cannot decode primary index entry " << it->key().ToString(true) + << "=>" << it->value().ToString(true); + VerificationAbort(thread->shared, oss.str(), s); + assert(false); + return; + } ++primary_index_entries_count; + + // Search secondary index. + uint32_t a = record.a_value(); + uint32_t c = record.c_value(); + char sk_buf[12]; + EncodeFixed32(sk_buf, Record::kSecondaryIndexId); + std::reverse(sk_buf, sk_buf + sizeof(uint32_t)); + EncodeFixed32(sk_buf + sizeof(uint32_t), c); + std::reverse(sk_buf + sizeof(uint32_t), sk_buf + 2 * sizeof(uint32_t)); + EncodeFixed32(sk_buf + 2 * sizeof(uint32_t), a); + std::reverse(sk_buf + 2 * sizeof(uint32_t), sk_buf + sizeof(sk_buf)); + Slice sk(sk_buf, sizeof(sk_buf)); + std::string value; + s = db_->Get(ropts, sk, &value); + if (!s.ok()) { + oss << "Cannot find secondary index entry " << sk.ToString(true); + VerificationAbort(thread->shared, oss.str(), s); + assert(false); + return; + } } } @@ -892,8 +1118,9 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { Record record; Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); if (!s.ok()) { - VerificationAbort(thread->shared, "Cannot decode secondary index entry", - s); + oss << "Cannot decode secondary index entry"; + VerificationAbort(thread->shared, oss.str(), s); + assert(false); return; } // After decoding secondary index entry, we know a and c. Crc is verified @@ -904,104 +1131,381 @@ void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { std::string value; s = db_->Get(ropts, pk, &value); if (!s.ok()) { - std::ostringstream oss; oss << "Error searching pk " << Slice(pk).ToString(true) << ". " << s.ToString(); VerificationAbort(thread->shared, oss.str(), s); + assert(false); return; } auto result = Record::DecodePrimaryIndexValue(value); s = std::get<0>(result); if (!s.ok()) { - std::ostringstream oss; oss << "Error decoding primary index value " << Slice(value).ToString(true) << ". " << s.ToString(); VerificationAbort(thread->shared, oss.str(), s); + assert(false); + return; } uint32_t c_in_primary = std::get<2>(result); if (c_in_primary != record.c_value()) { - std::ostringstream oss; oss << "Pk/sk mismatch. pk: (c=" << c_in_primary << "), sk: (c=" << record.c_value() << ")"; VerificationAbort(thread->shared, oss.str(), s); + assert(false); + return; } } } if (secondary_index_entries_count != primary_index_entries_count) { - std::ostringstream oss; oss << "Pk/sk mismatch: primary index has " << primary_index_entries_count << " entries. Secondary index has " << secondary_index_entries_count << " entries."; VerificationAbort(thread->shared, oss.str(), Status::OK()); + assert(false); + return; } } -uint32_t MultiOpsTxnsStressTest::ChooseA(ThreadState* thread) { - uint32_t rnd = thread->rand.Uniform(5); - uint32_t next_a_low = next_a_.load(std::memory_order_relaxed); - assert(next_a_low != 0); - if (rnd == 0) { - return next_a_low - 1; - } +std::pair MultiOpsTxnsStressTest::ChooseExistingA( + ThreadState* thread) { + uint32_t tid = thread->tid; + auto& key_gen = key_gen_for_a_.at(tid); + return key_gen->ChooseExisting(); +} + +uint32_t MultiOpsTxnsStressTest::GenerateNextA(ThreadState* thread) { + uint32_t tid = thread->tid; + auto& key_gen = key_gen_for_a_.at(tid); + return key_gen->Allocate(); +} + +std::pair MultiOpsTxnsStressTest::ChooseExistingC( + ThreadState* thread) { + uint32_t tid = thread->tid; + auto& key_gen = key_gen_for_c_.at(tid); + return key_gen->ChooseExisting(); +} - uint32_t result = 0; - result = thread->rand.Next() % next_a_low; - if (thread->rand.OneIn(3)) { - return result; +uint32_t MultiOpsTxnsStressTest::GenerateNextC(ThreadState* thread) { + uint32_t tid = thread->tid; + auto& key_gen = key_gen_for_c_.at(tid); + return key_gen->Allocate(); +} + +std::string MultiOpsTxnsStressTest::KeySpaces::EncodeTo() const { + std::string result; + PutFixed32(&result, lb_a); + PutFixed32(&result, ub_a); + PutFixed32(&result, lb_c); + PutFixed32(&result, ub_c); + return result; +} + +bool MultiOpsTxnsStressTest::KeySpaces::DecodeFrom(Slice data) { + if (!GetFixed32(&data, &lb_a) || !GetFixed32(&data, &ub_a) || + !GetFixed32(&data, &lb_c) || !GetFixed32(&data, &ub_c)) { + return false; } - uint32_t next_a_high = next_a_.load(std::memory_order_relaxed); - // A higher chance that this a still exists. - return next_a_low + (next_a_high - next_a_low) / 2; + return true; +} + +void MultiOpsTxnsStressTest::PersistKeySpacesDesc( + const std::string& key_spaces_path, uint32_t lb_a, uint32_t ub_a, + uint32_t lb_c, uint32_t ub_c) { + KeySpaces key_spaces(lb_a, ub_a, lb_c, ub_c); + std::string key_spaces_rep = key_spaces.EncodeTo(); + + std::unique_ptr wfile; + Status s1 = + Env::Default()->NewWritableFile(key_spaces_path, &wfile, EnvOptions()); + assert(s1.ok()); + assert(wfile); + s1 = wfile->Append(key_spaces_rep); + assert(s1.ok()); } -uint32_t MultiOpsTxnsStressTest::GenerateNextA() { - return next_a_.fetch_add(1, std::memory_order_relaxed); +MultiOpsTxnsStressTest::KeySpaces MultiOpsTxnsStressTest::ReadKeySpacesDesc( + const std::string& key_spaces_path) { + KeySpaces key_spaces; + std::unique_ptr sfile; + Status s1 = + Env::Default()->NewSequentialFile(key_spaces_path, &sfile, EnvOptions()); + assert(s1.ok()); + assert(sfile); + char buf[16]; + Slice result; + s1 = sfile->Read(sizeof(buf), &result, buf); + assert(s1.ok()); + if (!key_spaces.DecodeFrom(result)) { + assert(false); + } + return key_spaces; } -void MultiOpsTxnsStressTest::PreloadDb(SharedState* shared, size_t num_c) { +// Create an empty database if necessary and preload it with initial test data. +// Key range [lb_a, ub_a), [lb_c, ub_c). The key ranges will be shared by +// 'threads' threads. +// PreloadDb() also sets up KeyGenerator objects for each sub key range +// operated on by each thread. +// Both [lb_a, ub_a) and [lb_c, ub_c) are partitioned. Each thread operates on +// one sub range, using KeyGenerators to generate keys. +// For example, we choose a from [0, 10000) and c from [0, 100). Number of +// threads is 32, their tids range from 0 to 31. +// Thread k chooses a from [312*k,312*(k+1)) and c from [3*k,3*(k+1)) if k<31. +// Thread 31 chooses a from [9672, 10000) and c from [93, 100). +// Within each subrange: a from [low1, high1), c from [low2, high2). +// high1 - low1 > high2 - low2 +// We reserve {high1 - 1} and {high2 - 1} as unallocated. +// The records are , , ..., +// , ... +void MultiOpsTxnsStressTest::PreloadDb(SharedState* shared, int threads, + uint32_t lb_a, uint32_t ub_a, + uint32_t lb_c, uint32_t ub_c) { #ifdef ROCKSDB_LITE (void)shared; - (void)num_c; + (void)threads; + (void)lb_a; + (void)ub_a; + (void)lb_c; + (void)ub_c; #else - // TODO (yanqin) maybe parallelize. Currently execute in single thread. + key_gen_for_a_.resize(threads); + key_gen_for_c_.resize(threads); + + assert(ub_a > lb_a && ub_a > lb_a + threads); + assert(ub_c > lb_c && ub_c > lb_c + threads); + + PersistKeySpacesDesc(FLAGS_key_spaces_path, lb_a, ub_a, lb_c, ub_c); + + fprintf(stdout, "a from [%u, %u), c from [%u, %u)\n", + static_cast(lb_a), static_cast(ub_a), + static_cast(lb_c), static_cast(ub_c)); + + const uint32_t num_c = ub_c - lb_c; + const uint32_t num_c_per_thread = num_c / threads; + const uint32_t num_a = ub_a - lb_a; + const uint32_t num_a_per_thread = num_a / threads; + WriteOptions wopts; - wopts.disableWAL = true; - wopts.sync = false; + wopts.disableWAL = FLAGS_disable_wal; Random rnd(shared->GetSeed()); assert(txn_db_); - for (uint32_t c = 0; c < static_cast(num_c); ++c) { - for (uint32_t a = c * kInitialCARatio; a < ((c + 1) * kInitialCARatio); - ++a) { - Record record(a, /*_b=*/rnd.Next(), c); - WriteBatch wb; - const auto primary_index_entry = record.EncodePrimaryIndexEntry(); - Status s = wb.Put(primary_index_entry.first, primary_index_entry.second); - assert(s.ok()); - const auto secondary_index_entry = record.EncodeSecondaryIndexEntry(); - s = wb.Put(secondary_index_entry.first, secondary_index_entry.second); - assert(s.ok()); - s = txn_db_->Write(wopts, &wb); - assert(s.ok()); - - // TODO (yanqin): make the following check optional, especially when data - // size is large. - Record tmp_rec; - tmp_rec.SetB(record.b_value()); - s = tmp_rec.DecodeSecondaryIndexEntry(secondary_index_entry.first, - secondary_index_entry.second); - assert(s.ok()); - assert(tmp_rec == record); + + std::vector existing_a_uniqs(threads); + std::vector non_existing_a_uniqs(threads); + std::vector existing_c_uniqs(threads); + std::vector non_existing_c_uniqs(threads); + + for (uint32_t a = lb_a; a < ub_a; ++a) { + uint32_t tid = (a - lb_a) / num_a_per_thread; + if (tid >= static_cast(threads)) { + tid = threads - 1; } + + uint32_t a_base = lb_a + tid * num_a_per_thread; + uint32_t a_hi = (tid < static_cast(threads - 1)) + ? (a_base + num_a_per_thread) + : ub_a; + uint32_t a_delta = a - a_base; + + if (a == a_hi - 1) { + non_existing_a_uniqs[tid].insert(a); + continue; + } + + uint32_t c_base = lb_c + tid * num_c_per_thread; + uint32_t c_hi = (tid < static_cast(threads - 1)) + ? (c_base + num_c_per_thread) + : ub_c; + uint32_t c_delta = a_delta % (c_hi - c_base - 1); + uint32_t c = c_base + c_delta; + + uint32_t b = rnd.Next(); + Record record(a, b, c); + WriteBatch wb; + const auto primary_index_entry = record.EncodePrimaryIndexEntry(); + Status s = wb.Put(primary_index_entry.first, primary_index_entry.second); + assert(s.ok()); + + const auto secondary_index_entry = record.EncodeSecondaryIndexEntry(); + s = wb.Put(secondary_index_entry.first, secondary_index_entry.second); + assert(s.ok()); + + s = txn_db_->Write(wopts, &wb); + assert(s.ok()); + + // TODO (yanqin): make the following check optional, especially when data + // size is large. + Record tmp_rec; + tmp_rec.SetB(record.b_value()); + s = tmp_rec.DecodeSecondaryIndexEntry(secondary_index_entry.first, + secondary_index_entry.second); + assert(s.ok()); + assert(tmp_rec == record); + + existing_a_uniqs[tid].insert(a); + existing_c_uniqs[tid].insert(c); + } + + for (int i = 0; i < threads; ++i) { + uint32_t my_seed = i + shared->GetSeed(); + + auto& key_gen_for_a = key_gen_for_a_[i]; + assert(!key_gen_for_a); + uint32_t low = lb_a + i * num_a_per_thread; + uint32_t high = (i < threads - 1) ? (low + num_a_per_thread) : ub_a; + assert(existing_a_uniqs[i].size() == high - low - 1); + assert(non_existing_a_uniqs[i].size() == 1); + key_gen_for_a = std::make_unique( + my_seed, low, high, std::move(existing_a_uniqs[i]), + std::move(non_existing_a_uniqs[i])); + + auto& key_gen_for_c = key_gen_for_c_[i]; + assert(!key_gen_for_c); + low = lb_c + i * num_c_per_thread; + high = (i < threads - 1) ? (low + num_c_per_thread) : ub_c; + non_existing_c_uniqs[i].insert(high - 1); + assert(existing_c_uniqs[i].size() == high - low - 1); + assert(non_existing_c_uniqs[i].size() == 1); + key_gen_for_c = std::make_unique( + my_seed, low, high, std::move(existing_c_uniqs[i]), + std::move(non_existing_c_uniqs[i])); } - Status s = db_->Flush(FlushOptions()); - assert(s.ok()); - next_a_.store(static_cast((num_c + 1) * kInitialCARatio)); - fprintf(stdout, "DB preloaded with %d entries\n", - static_cast(num_c * kInitialCARatio)); #endif // !ROCKSDB_LITE } +// Scan an existing, non-empty database. +// Set up [lb_a, ub_a) and [lb_c, ub_c) as test key ranges. +// Set up KeyGenerator objects for each sub key range operated on by each +// thread. +// Scan the entire database and for each subrange, populate the existing keys +// and non-existing keys. We currently require the non-existing keys be +// non-empty after initialization. +void MultiOpsTxnsStressTest::ScanExistingDb(SharedState* shared, int threads) { + key_gen_for_a_.resize(threads); + key_gen_for_c_.resize(threads); + + KeySpaces key_spaces = ReadKeySpacesDesc(FLAGS_key_spaces_path); + + const uint32_t lb_a = key_spaces.lb_a; + const uint32_t ub_a = key_spaces.ub_a; + const uint32_t lb_c = key_spaces.lb_c; + const uint32_t ub_c = key_spaces.ub_c; + + assert(lb_a < ub_a && lb_c < ub_c); + + fprintf(stdout, "a from [%u, %u), c from [%u, %u)\n", + static_cast(lb_a), static_cast(ub_a), + static_cast(lb_c), static_cast(ub_c)); + + assert(ub_a > lb_a && ub_a > lb_a + threads); + assert(ub_c > lb_c && ub_c > lb_c + threads); + + const uint32_t num_c = ub_c - lb_c; + const uint32_t num_c_per_thread = num_c / threads; + const uint32_t num_a = ub_a - lb_a; + const uint32_t num_a_per_thread = num_a / threads; + + assert(db_); + ReadOptions ropts; + std::vector existing_a_uniqs(threads); + std::vector non_existing_a_uniqs(threads); + std::vector existing_c_uniqs(threads); + std::vector non_existing_c_uniqs(threads); + { + std::string pk_lb_str = Record::EncodePrimaryKey(0); + std::string pk_ub_str = + Record::EncodePrimaryKey(std::numeric_limits::max()); + Slice pk_lb = pk_lb_str; + Slice pk_ub = pk_ub_str; + ropts.iterate_lower_bound = &pk_lb; + ropts.iterate_upper_bound = &pk_ub; + ropts.total_order_seek = true; + std::unique_ptr it(db_->NewIterator(ropts)); + + for (it->SeekToFirst(); it->Valid(); it->Next()) { + Record record; + Status s = record.DecodePrimaryIndexEntry(it->key(), it->value()); + if (!s.ok()) { + fprintf(stderr, "Cannot decode primary index entry: %s\n", + s.ToString().c_str()); + assert(false); + } + uint32_t a = record.a_value(); + assert(a >= lb_a); + assert(a < ub_a); + uint32_t tid = (a - lb_a) / num_a_per_thread; + if (tid >= static_cast(threads)) { + tid = threads - 1; + } + + existing_a_uniqs[tid].insert(a); + + uint32_t c = record.c_value(); + assert(c >= lb_c); + assert(c < ub_c); + tid = (c - lb_c) / num_c_per_thread; + if (tid >= static_cast(threads)) { + tid = threads - 1; + } + auto& existing_c_uniq = existing_c_uniqs[tid]; + existing_c_uniq.insert(c); + } + + for (uint32_t a = lb_a; a < ub_a; ++a) { + uint32_t tid = (a - lb_a) / num_a_per_thread; + if (tid >= static_cast(threads)) { + tid = threads - 1; + } + if (0 == existing_a_uniqs[tid].count(a)) { + non_existing_a_uniqs[tid].insert(a); + } + } + + for (uint32_t c = lb_c; c < ub_c; ++c) { + uint32_t tid = (c - lb_c) / num_c_per_thread; + if (tid >= static_cast(threads)) { + tid = threads - 1; + } + if (0 == existing_c_uniqs[tid].count(c)) { + non_existing_c_uniqs[tid].insert(c); + } + } + + for (int i = 0; i < threads; ++i) { + uint32_t my_seed = i + shared->GetSeed(); + auto& key_gen_for_a = key_gen_for_a_[i]; + assert(!key_gen_for_a); + uint32_t low = lb_a + i * num_a_per_thread; + uint32_t high = (i < threads - 1) ? (low + num_a_per_thread) : ub_a; + + // The following two assertions assume the test thread count and key + // space remain the same across different runs. Will need to relax. + assert(existing_a_uniqs[i].size() == high - low - 1); + assert(non_existing_a_uniqs[i].size() == 1); + + key_gen_for_a = std::make_unique( + my_seed, low, high, std::move(existing_a_uniqs[i]), + std::move(non_existing_a_uniqs[i])); + + auto& key_gen_for_c = key_gen_for_c_[i]; + assert(!key_gen_for_c); + low = lb_c + i * num_c_per_thread; + high = (i < threads - 1) ? (low + num_c_per_thread) : ub_c; + + // The following two assertions assume the test thread count and key + // space remain the same across different runs. Will need to relax. + assert(existing_c_uniqs[i].size() == high - low - 1); + assert(non_existing_c_uniqs[i].size() == 1); + + key_gen_for_c = std::make_unique( + my_seed, low, high, std::move(existing_c_uniqs[i]), + std::move(non_existing_c_uniqs[i])); + } + } +} + StressTest* CreateMultiOpsTxnsStressTest() { return new MultiOpsTxnsStressTest(); } @@ -1017,6 +1521,12 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() { if (!FLAGS_use_txn) { fprintf(stderr, "-use_txn must be true if -test_multi_ops_txns\n"); exit(1); + } else if (FLAGS_test_secondary > 0) { + fprintf( + stderr, + "secondary instance does not support replaying logs (MANIFEST + WAL) " + "of TransactionDB with write-prepared/write-unprepared policy\n"); + exit(1); } if (FLAGS_clear_column_family_one_in > 0) { fprintf(stderr, @@ -1039,6 +1549,12 @@ void CheckAndSetOptionsForMultiOpsTxnStressTest() { "-delrangepercent be 0\n"); exit(1); } + if (FLAGS_key_spaces_path.empty()) { + fprintf(stderr, + "Must specify a file to store ranges of A and C via " + "-key_spaces_path\n"); + exit(1); + } #else fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n"); exit(1); diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h index ac74e3f8e..47b438875 100644 --- a/db_stress_tool/multi_ops_txns_stress.h +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -196,7 +196,7 @@ class MultiOpsTxnsStressTest : public StressTest { void FinishInitDb(SharedState*) override; - void ReopenAndPreloadDb(SharedState* shared); + void ReopenAndPreloadDbIfNeeded(SharedState* shared); bool IsStateTracked() const override { return false; } @@ -262,10 +262,10 @@ class MultiOpsTxnsStressTest : public StressTest { const std::vector& rand_column_families) override; Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, - uint32_t new_a); + uint32_t old_a_pos, uint32_t new_a); Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c, - uint32_t new_c); + uint32_t old_c_pos, uint32_t new_c); Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a, uint32_t b_delta); @@ -276,16 +276,88 @@ class MultiOpsTxnsStressTest : public StressTest { void VerifyDb(ThreadState* thread) const override; + void ContinuouslyVerifyDb(ThreadState* thread) const override { + VerifyDb(thread); + } + protected: - uint32_t ChooseA(ThreadState* thread); + using KeySet = std::set; + class KeyGenerator { + public: + explicit KeyGenerator(uint32_t s, uint32_t low, uint32_t high, + KeySet&& existing_uniq, KeySet&& non_existing_uniq) + : rand_(s), + low_(low), + high_(high), + existing_uniq_(std::move(existing_uniq)), + non_existing_uniq_(std::move(non_existing_uniq)) {} + ~KeyGenerator() { + assert(!existing_uniq_.empty()); + assert(!non_existing_uniq_.empty()); + } + void FinishInit(); + + std::pair ChooseExisting(); + void Replace(uint32_t old_val, uint32_t old_pos, uint32_t new_val); + uint32_t Allocate(); + void UndoAllocation(uint32_t new_val); + + std::string ToString() const { + std::ostringstream oss; + oss << "[" << low_ << ", " << high_ << "): " << existing_.size() + << " elements, " << existing_uniq_.size() << " unique values, " + << non_existing_uniq_.size() << " unique non-existing values"; + return oss.str(); + } + + private: + Random rand_; + uint32_t low_ = 0; + uint32_t high_ = 0; + std::vector existing_{}; + KeySet existing_uniq_{}; + KeySet non_existing_uniq_{}; + bool initialized_ = false; + }; + + // Return + std::pair ChooseExistingA(ThreadState* thread); + + uint32_t GenerateNextA(ThreadState* thread); + + // Return + std::pair ChooseExistingC(ThreadState* thread); + + uint32_t GenerateNextC(ThreadState* thread); - uint32_t GenerateNextA(); + std::vector> key_gen_for_a_; + std::vector> key_gen_for_c_; private: - void PreloadDb(SharedState* shared, size_t num_c); + struct KeySpaces { + uint32_t lb_a = 0; + uint32_t ub_a = 0; + uint32_t lb_c = 0; + uint32_t ub_c = 0; + + explicit KeySpaces() = default; + explicit KeySpaces(uint32_t _lb_a, uint32_t _ub_a, uint32_t _lb_c, + uint32_t _ub_c) + : lb_a(_lb_a), ub_a(_ub_a), lb_c(_lb_c), ub_c(_ub_c) {} + + std::string EncodeTo() const; + bool DecodeFrom(Slice data); + }; + + void PersistKeySpacesDesc(const std::string& key_spaces_path, uint32_t lb_a, + uint32_t ub_a, uint32_t lb_c, uint32_t ub_c); + + KeySpaces ReadKeySpacesDesc(const std::string& key_spaces_path); + + void PreloadDb(SharedState* shared, int threads, uint32_t lb_a, uint32_t ub_a, + uint32_t lb_c, uint32_t ub_c); - // TODO (yanqin) encapsulate the selection of keys a separate class. - std::atomic next_a_{0}; + void ScanExistingDb(SharedState* shared, int threads); }; class InvariantChecker { diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 276302be8..2a60b9707 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -226,8 +226,12 @@ struct TransactionDBOptions { private: // 128 entries + // Should the default value change, please also update wp_snapshot_cache_bits + // in db_stress_gflags.cc size_t wp_snapshot_cache_bits = static_cast(7); // 8m entry, 64MB size + // Should the default value change, please also update wp_commit_cache_bits + // in db_stress_gflags.cc size_t wp_commit_cache_bits = static_cast(23); // For testing, whether transaction name should be auto-generated or not. This @@ -239,6 +243,7 @@ struct TransactionDBOptions { friend class WritePreparedTransactionTestBase; friend class TransactionTestBase; friend class MySQLStyleTransactionTest; + friend class StressTest; }; struct TransactionOptions { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index bd133422d..b6e1c8ac2 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -24,6 +24,10 @@ import argparse # cf_consistency_params < args # for txn: # default_params < {blackbox,whitebox}_default_params < txn_params < args +# for ts: +# default_params < {blackbox,whitebox}_default_params < ts_params < args +# for multiops_txn: +# default_params < {blackbox,whitebox}_default_params < multiops_txn_params < args default_params = { @@ -199,6 +203,21 @@ def setup_expected_values_dir(): os.mkdir(expected_values_dir) return expected_values_dir +multiops_txn_key_spaces_file = None +def setup_multiops_txn_key_spaces_file(): + global multiops_txn_key_spaces_file + if multiops_txn_key_spaces_file is not None: + return multiops_txn_key_spaces_file + key_spaces_file_prefix = "rocksdb_crashtest_multiops_txn_key_spaces" + test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR) + if test_tmpdir is None or test_tmpdir == "": + multiops_txn_key_spaces_file = tempfile.mkstemp( + prefix=key_spaces_file_prefix)[1] + else: + multiops_txn_key_spaces_file = tempfile.mkstemp( + prefix=key_spaces_file_prefix, dir=test_tmpdir)[1] + return multiops_txn_key_spaces_file + def is_direct_io_supported(dbname): with tempfile.NamedTemporaryFile(dir=dbname) as f: @@ -323,6 +342,61 @@ ts_params = { "use_block_based_filter": 0, } +multiops_txn_default_params = { + "test_cf_consistency": 0, + "test_batches_snapshots": 0, + "test_multi_ops_txns": 1, + "use_txn": 1, + "two_write_queues": lambda: random.choice([0, 1]), + # TODO: enable write-prepared + "disable_wal": 0, + "use_only_the_last_commit_time_batch_for_recovery": lambda: random.choice([0, 1]), + "clear_column_family_one_in": 0, + "column_families": 1, + "enable_pipelined_write": lambda: random.choice([0, 1]), + # This test already acquires snapshots in reads + "acquire_snapshot_one_in": 0, + "backup_one_in": 0, + "writepercent": 0, + "delpercent": 0, + "delrangepercent": 0, + "customopspercent": 80, + "readpercent": 5, + "iterpercent": 15, + "prefixpercent": 0, + "verify_db_one_in": 1000, + "continuous_verification_interval": 1000, + "delay_snapshot_read_one_in": 3, + # 65536 is the smallest possible value for write_buffer_size. Smaller + # values will be sanitized to 65536 during db open. SetOptions currently + # does not sanitize options, but very small write_buffer_size may cause + # assertion failure in + # https://github.com/facebook/rocksdb/blob/7.0.fb/db/memtable.cc#L117. + "write_buffer_size": 65536, + # flush more frequently to generate more files, thus trigger more + # compactions. + "flush_one_in": 1000, + "key_spaces_path": setup_multiops_txn_key_spaces_file(), +} + +multiops_wc_txn_params = { + "txn_write_policy": 0, + # TODO re-enable pipelined write. Not well tested atm + "enable_pipelined_write": 0, +} + +multiops_wp_txn_params = { + "txn_write_policy": 1, + "wp_snapshot_cache_bits": 1, + # try small wp_commit_cache_bits, e.g. 0 once we explore storing full + # commit sequence numbers in commit cache + "wp_commit_cache_bits": 10, + # pipeline write is not currnetly compatible with WritePrepared txns + "enable_pipelined_write": 0, + # OpenReadOnly after checkpoint is not currnetly compatible with WritePrepared txns + "checkpoint_one_in": 0, +} + def finalize_and_sanitize(src_params): dest_params = dict([(k, v() if callable(v) else v) for (k, v) in src_params.items()]) @@ -407,6 +481,8 @@ def finalize_and_sanitize(src_params): if (dest_params.get("prefix_size") == -1 and dest_params.get("memtable_whole_key_filtering") == 0): dest_params["memtable_prefix_bloom_size_ratio"] = 0 + if dest_params.get("two_write_queues") == 1: + dest_params["enable_pipelined_write"] = 0 return dest_params def gen_cmd_params(args): @@ -431,6 +507,12 @@ def gen_cmd_params(args): params.update(best_efforts_recovery_params) if args.enable_ts: params.update(ts_params) + if args.test_multiops_txn: + params.update(multiops_txn_default_params) + if args.write_policy == 'write_committed': + params.update(multiops_wc_txn_params) + elif args.write_policy == 'write_prepared': + params.update(multiops_wp_txn_params) # Best-effort recovery and BlobDB are currently incompatible. Test BE recovery # if specified on the command line; otherwise, apply BlobDB related overrides @@ -453,7 +535,8 @@ def gen_cmd(params, unknown_params): for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)] if k not in set(['test_type', 'simple', 'duration', 'interval', 'random_kill_odd', 'cf_consistency', 'txn', - 'test_best_efforts_recovery', 'enable_ts', 'stress_cmd']) + 'test_best_efforts_recovery', 'enable_ts', + 'test_multiops_txn', 'write_policy', 'stress_cmd']) and v is not None] + unknown_params return cmd @@ -713,6 +796,8 @@ def main(): parser.add_argument("--txn", action='store_true') parser.add_argument("--test_best_efforts_recovery", action='store_true') parser.add_argument("--enable_ts", action='store_true') + parser.add_argument("--test_multiops_txn", action='store_true') + parser.add_argument("--write_policy", choices=["write_committed", "write_prepared"]) parser.add_argument("--stress_cmd") all_params = dict(list(default_params.items()) @@ -722,7 +807,10 @@ def main(): + list(blackbox_simple_default_params.items()) + list(whitebox_simple_default_params.items()) + list(blob_params.items()) - + list(ts_params.items())) + + list(ts_params.items()) + + list(multiops_txn_default_params.items()) + + list(multiops_wc_txn_params.items()) + + list(multiops_wp_txn_params.items())) for k, v in all_params.items(): parser.add_argument("--" + k, type=type(v() if callable(v) else v)) @@ -744,6 +832,8 @@ def main(): # Only delete the `expected_values_dir` if test passes if expected_values_dir is not None: shutil.rmtree(expected_values_dir) + if multiops_txn_key_spaces_file is not None: + os.remove(multiops_txn_key_spaces_file) if __name__ == '__main__':