diff --git a/TARGETS b/TARGETS index c33545cfd..315f29d3e 100644 --- a/TARGETS +++ b/TARGETS @@ -875,6 +875,7 @@ cpp_library( "db_stress_tool/db_stress_test_base.cc", "db_stress_tool/db_stress_tool.cc", "db_stress_tool/expected_state.cc", + "db_stress_tool/multi_ops_txns_stress.cc", "db_stress_tool/no_batched_ops_stress.cc", "test_util/testutil.cc", "tools/block_cache_analyzer/block_cache_trace_analyzer.cc", diff --git a/db_stress_tool/CMakeLists.txt b/db_stress_tool/CMakeLists.txt index afbc5fa20..96d70dd0e 100644 --- a/db_stress_tool/CMakeLists.txt +++ b/db_stress_tool/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(db_stress${ARTIFACT_SUFFIX} db_stress_test_base.cc db_stress_tool.cc expected_state.cc + multi_ops_txns_stress.cc no_batched_ops_stress.cc) target_link_libraries(db_stress${ARTIFACT_SUFFIX} ${ROCKSDB_LIB} ${THIRDPARTY_LIBS}) list(APPEND tool_deps db_stress) diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 7cc0a9110..d2e02cdd1 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -87,6 +87,7 @@ DECLARE_int64(active_width); DECLARE_bool(test_batches_snapshots); DECLARE_bool(atomic_flush); DECLARE_bool(test_cf_consistency); +DECLARE_bool(test_multi_ops_txns); DECLARE_int32(threads); DECLARE_int32(ttl); DECLARE_int32(value_size_mult); @@ -203,6 +204,7 @@ DECLARE_int32(delrangepercent); DECLARE_int32(nooverwritepercent); DECLARE_int32(iterpercent); DECLARE_uint64(num_iterations); +DECLARE_int32(customopspercent); DECLARE_string(compression_type); DECLARE_string(bottommost_compression_type); DECLARE_int32(compression_max_dict_bytes); @@ -569,6 +571,8 @@ extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz); extern StressTest* CreateCfConsistencyStressTest(); extern StressTest* CreateBatchedOpsStressTest(); extern StressTest* CreateNonBatchedOpsStressTest(); +extern StressTest* CreateMultiOpsTxnsStressTest(); +extern void CheckAndSetOptionsForMultiOpsTxnStressTest(); extern void InitializeHotKeyGenerator(double alpha); extern int64_t GetOneHotKeyID(double rand_seed, int64_t max_key); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 4b8da962d..7af49fc05 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -90,6 +90,11 @@ DEFINE_bool(test_cf_consistency, false, "multiple column families are consistent. Setting this implies " "`atomic_flush=true` is set true if `disable_wal=false`.\n"); +DEFINE_bool(test_multi_ops_txns, false, + "If set, runs stress test dedicated to verifying multi-ops " + "transactions on a simple relational table with primary and " + "secondary index."); + DEFINE_int32(threads, 32, "Number of concurrent threads to run."); DEFINE_int32(ttl, -1, @@ -675,6 +680,10 @@ DEFINE_uint64(num_iterations, 10, "Number of iterations per MultiIterate run"); static const bool FLAGS_num_iterations_dummy __attribute__((__unused__)) = RegisterFlagValidator(&FLAGS_num_iterations, &ValidateUint32Range); +DEFINE_int32( + customopspercent, 0, + "Ratio of custom operations to total workload (expressed as a percentage)"); + DEFINE_string(compression_type, "snappy", "Algorithm to use to compress the database"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 7fc4fc000..dcfbd4589 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -625,11 +625,15 @@ void StressTest::OperateDb(ThreadState* thread) { write_opts.sync = true; } write_opts.disableWAL = FLAGS_disable_wal; - const int prefixBound = static_cast(FLAGS_readpercent) + - static_cast(FLAGS_prefixpercent); - const int writeBound = prefixBound + static_cast(FLAGS_writepercent); - const int delBound = writeBound + static_cast(FLAGS_delpercent); - const int delRangeBound = delBound + static_cast(FLAGS_delrangepercent); + const int prefix_bound = static_cast(FLAGS_readpercent) + + static_cast(FLAGS_prefixpercent); + const int write_bound = prefix_bound + static_cast(FLAGS_writepercent); + const int del_bound = write_bound + static_cast(FLAGS_delpercent); + const int delrange_bound = + del_bound + static_cast(FLAGS_delrangepercent); + const int iterate_bound = + delrange_bound + static_cast(FLAGS_iterpercent); + const uint64_t ops_per_open = FLAGS_ops_per_thread / (FLAGS_reopen + 1); #ifndef NDEBUG @@ -885,7 +889,7 @@ void StressTest::OperateDb(ThreadState* thread) { } else { TestGet(thread, read_opts, rand_column_families, rand_keys); } - } else if (prob_op < prefixBound) { + } else if (prob_op < prefix_bound) { assert(static_cast(FLAGS_readpercent) <= prob_op); // OPERATION prefix scan // keys are 8 bytes long, prefix size is FLAGS_prefix_size. There are @@ -893,22 +897,22 @@ void StressTest::OperateDb(ThreadState* thread) { // be 2 ^ ((8 - FLAGS_prefix_size) * 8) possible keys with the same // prefix TestPrefixScan(thread, read_opts, rand_column_families, rand_keys); - } else if (prob_op < writeBound) { - assert(prefixBound <= prob_op); + } else if (prob_op < write_bound) { + assert(prefix_bound <= prob_op); // OPERATION write TestPut(thread, write_opts, read_opts, rand_column_families, rand_keys, value, lock); - } else if (prob_op < delBound) { - assert(writeBound <= prob_op); + } else if (prob_op < del_bound) { + assert(write_bound <= prob_op); // OPERATION delete TestDelete(thread, write_opts, rand_column_families, rand_keys, lock); - } else if (prob_op < delRangeBound) { - assert(delBound <= prob_op); + } else if (prob_op < delrange_bound) { + assert(del_bound <= prob_op); // OPERATION delete range TestDeleteRange(thread, write_opts, rand_column_families, rand_keys, lock); - } else { - assert(delRangeBound <= prob_op); + } else if (prob_op < iterate_bound) { + assert(delrange_bound <= prob_op); // OPERATION iterate int num_seeks = static_cast( std::min(static_cast(thread->rand.Uniform(4)), @@ -916,6 +920,9 @@ void StressTest::OperateDb(ThreadState* thread) { rand_keys = GenerateNKeys(thread, num_seeks, i); i += num_seeks - 1; TestIterate(thread, read_opts, rand_column_families, rand_keys); + } else { + assert(iterate_bound <= prob_op); + TestCustomOperations(thread, rand_column_families); } thread->stats.FinishedSingleOp(); #ifndef ROCKSDB_LITE @@ -2122,6 +2129,7 @@ void StressTest::PrintEnv() const { fprintf(stdout, "No overwrite percentage : %d%%\n", FLAGS_nooverwritepercent); fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent); + fprintf(stdout, "Custom ops percentage : %d%%\n", FLAGS_customopspercent); fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n", FLAGS_db_write_buffer_size); fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size); @@ -2752,7 +2760,7 @@ void StressTest::Reopen(ThreadState* thread) { // the db via a callbac ii) they hold on to a snapshot and the upcoming // ::Close would complain about it. const bool write_prepared = FLAGS_use_txn && FLAGS_txn_write_policy != 0; - bool bg_canceled = false; + bool bg_canceled __attribute__((unused)) = false; if (write_prepared || thread->rand.OneIn(2)) { const bool wait = write_prepared || static_cast(thread->rand.OneIn(2)); @@ -2760,7 +2768,6 @@ void StressTest::Reopen(ThreadState* thread) { bg_canceled = wait; } assert(!write_prepared || bg_canceled); - (void) bg_canceled; #else (void) thread; #endif diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 7b3ec1ea1..60ecf46c1 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -32,7 +32,7 @@ class StressTest { void InitDb(); // The initialization work is split into two parts to avoid a circular // dependency with `SharedState`. - void FinishInitDb(SharedState*); + virtual void FinishInitDb(SharedState*); // Return false if verification fails. bool VerifySecondaries(); @@ -203,6 +203,12 @@ class StressTest { const std::vector& rand_keys); #endif // !ROCKSDB_LITE + virtual Status TestCustomOperations( + ThreadState* /*thread*/, + const std::vector& /*rand_column_families*/) { + return Status::NotSupported("TestCustomOperations() must be overridden"); + } + void VerificationAbort(SharedState* shared, std::string msg, Status s) const; void VerificationAbort(SharedState* shared, std::string msg, int cf, diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 09d57cd9f..1020803b2 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -150,14 +150,18 @@ int db_stress_tool(int argc, char** argv) { exit(1); } if ((FLAGS_readpercent + FLAGS_prefixpercent + FLAGS_writepercent + - FLAGS_delpercent + FLAGS_delrangepercent + FLAGS_iterpercent) != 100) { - fprintf(stderr, - "Error: " - "Read(%d)+Prefix(%d)+Write(%d)+Delete(%d)+DeleteRange(%d)" - "+Iterate(%d) percents != " - "100!\n", - FLAGS_readpercent, FLAGS_prefixpercent, FLAGS_writepercent, - FLAGS_delpercent, FLAGS_delrangepercent, FLAGS_iterpercent); + FLAGS_delpercent + FLAGS_delrangepercent + FLAGS_iterpercent + + FLAGS_customopspercent) != 100) { + fprintf( + stderr, + "Error: " + "Read(-readpercent=%d)+Prefix(-prefixpercent=%d)+Write(-writepercent=%" + "d)+Delete(-delpercent=%d)+DeleteRange(-delrangepercent=%d)" + "+Iterate(-iterpercent=%d)+CustomOps(-customopspercent=%d) percents != " + "100!\n", + FLAGS_readpercent, FLAGS_prefixpercent, FLAGS_writepercent, + FLAGS_delpercent, FLAGS_delrangepercent, FLAGS_iterpercent, + FLAGS_customopspercent); exit(1); } if (FLAGS_disable_wal == 1 && FLAGS_reopen > 0) { @@ -287,6 +291,9 @@ int db_stress_tool(int argc, char** argv) { "batch_protection_bytes_per_key > 0\n"); exit(1); } + if (FLAGS_test_multi_ops_txns) { + CheckAndSetOptionsForMultiOpsTxnStressTest(); + } #ifndef NDEBUG KillPoint* kp = KillPoint::GetInstance(); @@ -331,6 +338,8 @@ int db_stress_tool(int argc, char** argv) { stress.reset(CreateCfConsistencyStressTest()); } else if (FLAGS_test_batches_snapshots) { stress.reset(CreateBatchedOpsStressTest()); + } else if (FLAGS_test_multi_ops_txns) { + stress.reset(CreateMultiOpsTxnsStressTest()); } else { stress.reset(CreateNonBatchedOpsStressTest()); } diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc new file mode 100644 index 000000000..cdd060109 --- /dev/null +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -0,0 +1,1035 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifdef GFLAGS +#include "db_stress_tool/multi_ops_txns_stress.h" + +#include "rocksdb/utilities/write_batch_with_index.h" +#include "util/defer.h" +#ifndef NDEBUG +#include "utilities/fault_injection_fs.h" +#endif // NDEBUG + +namespace ROCKSDB_NAMESPACE { + +// TODO: move these to gflags. +static constexpr uint32_t kInitNumC = 1000; +static constexpr uint32_t kInitialCARatio = 3; +static constexpr bool kDoPreload = true; + +std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey(uint32_t a) { + char buf[8]; + EncodeFixed32(buf, kPrimaryIndexId); + std::reverse(buf, buf + 4); + EncodeFixed32(buf + 4, a); + std::reverse(buf + 4, buf + 8); + return std::string(buf, sizeof(buf)); +} + +std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c) { + char buf[8]; + EncodeFixed32(buf, kSecondaryIndexId); + std::reverse(buf, buf + 4); + EncodeFixed32(buf + 4, c); + std::reverse(buf + 4, buf + 8); + return std::string(buf, sizeof(buf)); +} + +std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey(uint32_t c, + uint32_t a) { + char buf[12]; + EncodeFixed32(buf, kSecondaryIndexId); + std::reverse(buf, buf + 4); + EncodeFixed32(buf + 4, c); + EncodeFixed32(buf + 8, a); + std::reverse(buf + 4, buf + 8); + std::reverse(buf + 8, buf + 12); + return std::string(buf, sizeof(buf)); +} + +std::tuple +MultiOpsTxnsStressTest::Record::DecodePrimaryIndexValue( + Slice primary_index_value) { + if (primary_index_value.size() != 8) { + return std::tuple{Status::Corruption(""), 0, 0}; + } + uint32_t b = 0; + uint32_t c = 0; + if (!GetFixed32(&primary_index_value, &b) || + !GetFixed32(&primary_index_value, &c)) { + assert(false); + return std::tuple{Status::Corruption(""), 0, 0}; + } + return std::tuple{Status::OK(), b, c}; +} + +std::pair +MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexValue( + Slice secondary_index_value) { + if (secondary_index_value.size() != 4) { + return std::make_pair(Status::Corruption(""), 0); + } + uint32_t crc = 0; + bool result __attribute__((unused)) = + GetFixed32(&secondary_index_value, &crc); + assert(result); + return std::make_pair(Status::OK(), crc); +} + +std::pair +MultiOpsTxnsStressTest::Record::EncodePrimaryIndexEntry() const { + std::string primary_index_key = EncodePrimaryKey(); + std::string primary_index_value = EncodePrimaryIndexValue(); + return std::make_pair(primary_index_key, primary_index_value); +} + +std::string MultiOpsTxnsStressTest::Record::EncodePrimaryKey() const { + return EncodePrimaryKey(a_); +} + +std::string MultiOpsTxnsStressTest::Record::EncodePrimaryIndexValue() const { + char buf[8]; + EncodeFixed32(buf, b_); + EncodeFixed32(buf + 4, c_); + return std::string(buf, sizeof(buf)); +} + +std::pair +MultiOpsTxnsStressTest::Record::EncodeSecondaryIndexEntry() const { + std::string secondary_index_key; + char buf[12]; + EncodeFixed32(buf, kSecondaryIndexId); + std::reverse(buf, buf + 4); + EncodeFixed32(buf + 4, c_); + EncodeFixed32(buf + 8, a_); + std::reverse(buf + 4, buf + 8); + std::reverse(buf + 8, buf + 12); + secondary_index_key.assign(buf, sizeof(buf)); + + // Secondary index value is always 4-byte crc32 of the secondary key + std::string secondary_index_value; + uint32_t crc = crc32c::Value(buf, sizeof(buf)); + PutFixed32(&secondary_index_value, crc); + return std::make_pair(secondary_index_key, secondary_index_value); +} + +std::string MultiOpsTxnsStressTest::Record::EncodeSecondaryKey() const { + char buf[12]; + EncodeFixed32(buf, kSecondaryIndexId); + std::reverse(buf, buf + 4); + EncodeFixed32(buf + 4, c_); + EncodeFixed32(buf + 8, a_); + std::reverse(buf + 4, buf + 8); + std::reverse(buf + 8, buf + 12); + return std::string(buf, sizeof(buf)); +} + +Status MultiOpsTxnsStressTest::Record::DecodePrimaryIndexEntry( + Slice primary_index_key, Slice primary_index_value) { + if (primary_index_key.size() != 8) { + assert(false); + return Status::Corruption("Primary index key length is not 8"); + } + + const char* const index_id_buf = primary_index_key.data(); + uint32_t index_id = + static_cast(static_cast(index_id_buf[0])) << 24; + index_id += static_cast(static_cast(index_id_buf[1])) + << 16; + index_id += static_cast(static_cast(index_id_buf[2])) + << 8; + index_id += + static_cast(static_cast(index_id_buf[3])); + primary_index_key.remove_prefix(sizeof(uint32_t)); + if (index_id != kPrimaryIndexId) { + std::ostringstream oss; + oss << "Unexpected primary index id: " << index_id; + return Status::Corruption(oss.str()); + } + + const char* const buf = primary_index_key.data(); + a_ = static_cast(static_cast(buf[0])) << 24; + a_ += static_cast(static_cast(buf[1])) << 16; + a_ += static_cast(static_cast(buf[2])) << 8; + a_ += static_cast(static_cast(buf[3])); + + if (primary_index_value.size() != 8) { + return Status::Corruption("Primary index value length is not 8"); + } + GetFixed32(&primary_index_value, &b_); + GetFixed32(&primary_index_value, &c_); + return Status::OK(); +} + +Status MultiOpsTxnsStressTest::Record::DecodeSecondaryIndexEntry( + Slice secondary_index_key, Slice secondary_index_value) { + if (secondary_index_key.size() != 12) { + return Status::Corruption("Secondary index key length is not 12"); + } + uint32_t crc = + crc32c::Value(secondary_index_key.data(), secondary_index_key.size()); + + const char* const index_id_buf = secondary_index_key.data(); + uint32_t index_id = + static_cast(static_cast(index_id_buf[0])) << 24; + index_id += static_cast(static_cast(index_id_buf[1])) + << 16; + index_id += static_cast(static_cast(index_id_buf[2])) + << 8; + index_id += + static_cast(static_cast(index_id_buf[3])); + secondary_index_key.remove_prefix(sizeof(uint32_t)); + if (index_id != kSecondaryIndexId) { + std::ostringstream oss; + oss << "Unexpected secondary index id: " << index_id; + return Status::Corruption(oss.str()); + } + + const char* const buf = secondary_index_key.data(); + assert(secondary_index_key.size() == 8); + c_ = static_cast(static_cast(buf[0])) << 24; + c_ += static_cast(static_cast(buf[1])) << 16; + c_ += static_cast(static_cast(buf[2])) << 8; + c_ += static_cast(static_cast(buf[3])); + + a_ = static_cast(static_cast(buf[4])) << 24; + a_ += static_cast(static_cast(buf[5])) << 16; + a_ += static_cast(static_cast(buf[6])) << 8; + a_ += static_cast(static_cast(buf[7])); + + if (secondary_index_value.size() != 4) { + return Status::Corruption("Secondary index value length is not 4"); + } + uint32_t val = 0; + GetFixed32(&secondary_index_value, &val); + if (val != crc) { + std::ostringstream oss; + oss << "Secondary index key checksum mismatch, stored: " << val + << ", recomputed: " << crc; + return Status::Corruption(oss.str()); + } + return Status::OK(); +} + +void MultiOpsTxnsStressTest::FinishInitDb(SharedState* shared) { + if (FLAGS_enable_compaction_filter) { + // TODO (yanqin) enable compaction filter + } + if (kDoPreload) { + ReopenAndPreloadDb(shared); + } +} + +void MultiOpsTxnsStressTest::ReopenAndPreloadDb(SharedState* shared) { + (void)shared; +#ifndef ROCKSDB_LITE + std::vector cf_descs; + for (const auto* handle : column_families_) { + cf_descs.emplace_back(handle->GetName(), ColumnFamilyOptions(options_)); + } + CancelAllBackgroundWork(db_, /*wait=*/true); + for (auto* handle : column_families_) { + delete handle; + } + column_families_.clear(); + delete db_; + db_ = nullptr; + txn_db_ = nullptr; + + TransactionDBOptions txn_db_opts; + txn_db_opts.skip_concurrency_control = true; // speed-up preloading + Status s = TransactionDB::Open(options_, txn_db_opts, FLAGS_db, cf_descs, + &column_families_, &txn_db_); + if (s.ok()) { + db_ = txn_db_; + } else { + fprintf(stderr, "Failed to open db: %s\n", s.ToString().c_str()); + exit(1); + } + + PreloadDb(shared, kInitNumC); + + // Reopen + CancelAllBackgroundWork(db_, /*wait=*/true); + for (auto* handle : column_families_) { + delete handle; + } + column_families_.clear(); + s = db_->Close(); + if (!s.ok()) { + fprintf(stderr, "Error during closing db: %s\n", s.ToString().c_str()); + exit(1); + } + delete db_; + db_ = nullptr; + txn_db_ = nullptr; + + Open(); +#endif // !ROCKSDB_LITE +} + +// Used for point-lookup transaction +Status MultiOpsTxnsStressTest::TestGet( + ThreadState* thread, const ReadOptions& read_opts, + const std::vector& /*rand_column_families*/, + const std::vector& /*rand_keys*/) { + uint32_t a = ChooseA(thread); + return PointLookupTxn(thread, read_opts, a); +} + +// Not used. +std::vector MultiOpsTxnsStressTest::TestMultiGet( + ThreadState* /*thread*/, const ReadOptions& /*read_opts*/, + const std::vector& /*rand_column_families*/, + const std::vector& /*rand_keys*/) { + return std::vector{Status::NotSupported()}; +} + +Status MultiOpsTxnsStressTest::TestPrefixScan( + ThreadState* thread, const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) { + (void)thread; + (void)read_opts; + (void)rand_column_families; + (void)rand_keys; + return Status::OK(); +} + +// Given a key K, this creates an iterator which scans to K and then +// does a random sequence of Next/Prev operations. +Status MultiOpsTxnsStressTest::TestIterate( + ThreadState* thread, const ReadOptions& read_opts, + const std::vector& /*rand_column_families*/, + const std::vector& /*rand_keys*/) { + uint32_t c = thread->rand.Next() % kInitNumC; + return RangeScanTxn(thread, read_opts, c); +} + +// Not intended for use. +Status MultiOpsTxnsStressTest::TestPut(ThreadState* /*thread*/, + WriteOptions& /*write_opts*/, + const ReadOptions& /*read_opts*/, + const std::vector& /*cf_ids*/, + const std::vector& /*keys*/, + char (&value)[100], + std::unique_ptr& /*lock*/) { + (void)value; + return Status::NotSupported(); +} + +// Not intended for use. +Status MultiOpsTxnsStressTest::TestDelete( + ThreadState* /*thread*/, WriteOptions& /*write_opts*/, + const std::vector& /*rand_column_families*/, + const std::vector& /*rand_keys*/, + std::unique_ptr& /*lock*/) { + return Status::NotSupported(); +} + +// Not intended for use. +Status MultiOpsTxnsStressTest::TestDeleteRange( + ThreadState* /*thread*/, WriteOptions& /*write_opts*/, + const std::vector& /*rand_column_families*/, + const std::vector& /*rand_keys*/, + std::unique_ptr& /*lock*/) { + return Status::NotSupported(); +} + +void MultiOpsTxnsStressTest::TestIngestExternalFile( + ThreadState* thread, const std::vector& rand_column_families, + const std::vector& /*rand_keys*/, + std::unique_ptr& /*lock*/) { + // TODO (yanqin) + (void)thread; + (void)rand_column_families; +} + +void MultiOpsTxnsStressTest::TestCompactRange( + ThreadState* thread, int64_t /*rand_key*/, const Slice& /*start_key*/, + ColumnFamilyHandle* column_family) { + // TODO (yanqin). + // May use GetRangeHash() for validation before and after DB::CompactRange() + // completes. + (void)thread; + (void)column_family; +} + +Status MultiOpsTxnsStressTest::TestBackupRestore( + ThreadState* thread, const std::vector& rand_column_families, + const std::vector& /*rand_keys*/) { + // TODO (yanqin) + (void)thread; + (void)rand_column_families; + return Status::OK(); +} + +Status MultiOpsTxnsStressTest::TestCheckpoint( + ThreadState* thread, const std::vector& rand_column_families, + const std::vector& /*rand_keys*/) { + // TODO (yanqin) + (void)thread; + (void)rand_column_families; + return Status::OK(); +} + +#ifndef ROCKSDB_LITE +Status MultiOpsTxnsStressTest::TestApproximateSize( + ThreadState* thread, uint64_t iteration, + const std::vector& rand_column_families, + const std::vector& /*rand_keys*/) { + // TODO (yanqin) + (void)thread; + (void)iteration; + (void)rand_column_families; + return Status::OK(); +} +#endif // !ROCKSDB_LITE + +Status MultiOpsTxnsStressTest::TestCustomOperations( + ThreadState* thread, const std::vector& rand_column_families) { + (void)rand_column_families; + // Randomly choose from 0, 1, and 2. + // TODO (yanqin) allow user to configure probability of each operation. + uint32_t rand = thread->rand.Uniform(3); + Status s; + if (0 == rand) { + // Update primary key. + uint32_t old_a = ChooseA(thread); + uint32_t new_a = GenerateNextA(); + s = PrimaryKeyUpdateTxn(thread, old_a, new_a); + } else if (1 == rand) { + // Update secondary key. + uint32_t old_c = thread->rand.Next() % kInitNumC; + int count = 0; + uint32_t new_c = 0; + do { + ++count; + new_c = thread->rand.Next() % kInitNumC; + } while (count < 100 && new_c == old_c); + if (count >= 100) { + // If we reach here, it means our random number generator has a serious + // problem, or kInitNumC is chosen poorly. + std::terminate(); + } + s = SecondaryKeyUpdateTxn(thread, old_c, new_c); + } else if (2 == rand) { + // Update primary index value. + uint32_t a = ChooseA(thread); + s = UpdatePrimaryIndexValueTxn(thread, a, /*b_delta=*/1); + } else { + // Should never reach here. + assert(false); + } + return s; +} + +Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, + uint32_t old_a, + uint32_t new_a) { +#ifdef ROCKSDB_LITE + (void)thread; + (void)old_a; + (void)new_a; + return Status::NotSupported(); +#else + std::string old_pk = Record::EncodePrimaryKey(old_a); + std::string new_pk = Record::EncodePrimaryKey(new_a); + Transaction* txn = nullptr; + WriteOptions wopts; + Status s = NewTxn(wopts, &txn); + if (!s.ok()) { + assert(!txn); + thread->stats.AddErrors(1); + return s; + } + + assert(txn); + txn->SetSnapshotOnNextOperation(/*notifier=*/nullptr); + + const Defer cleanup([&s, thread, txn, this]() { + if (s.ok()) { + // Two gets, one for existing pk, one for locking potential new pk. + thread->stats.AddGets(/*ngets=*/2, /*nfounds=*/1); + thread->stats.AddDeletes(1); + thread->stats.AddBytesForWrites( + /*nwrites=*/2, + Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize); + thread->stats.AddSingleDeletes(1); + return; + } + if (s.IsNotFound()) { + thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0); + } else if (s.IsBusy()) { + // ignore. + } else { + thread->stats.AddErrors(1); + } + RollbackTxn(txn).PermitUncheckedError(); + }); + + ReadOptions ropts; + std::string value; + s = txn->GetForUpdate(ropts, old_pk, &value); + if (!s.ok()) { + return s; + } + std::string empty_value; + s = txn->GetForUpdate(ropts, new_pk, &empty_value); + if (s.ok()) { + assert(!empty_value.empty()); + s = Status::Busy(); + return s; + } + + auto result = Record::DecodePrimaryIndexValue(value); + s = std::get<0>(result); + if (!s.ok()) { + return s; + } + uint32_t b = std::get<1>(result); + uint32_t c = std::get<2>(result); + + ColumnFamilyHandle* cf = db_->DefaultColumnFamily(); + s = txn->Delete(cf, old_pk, /*assume_tracked=*/true); + if (!s.ok()) { + return s; + } + s = txn->Put(cf, new_pk, value, /*assume_tracked=*/true); + if (!s.ok()) { + return s; + } + + auto* wb = txn->GetWriteBatch(); + assert(wb); + + std::string old_sk = Record::EncodeSecondaryKey(c, old_a); + s = wb->SingleDelete(old_sk); + if (!s.ok()) { + return s; + } + + Record record(new_a, b, c); + std::string new_sk; + std::string new_crc; + std::tie(new_sk, new_crc) = record.EncodeSecondaryIndexEntry(); + s = wb->Put(new_sk, new_crc); + if (!s.ok()) { + return s; + } + + s = CommitTxn(txn); + return s; +#endif // !ROCKSDB_LITE +} + +Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, + uint32_t old_c, + uint32_t new_c) { +#ifdef ROCKSDB_LITE + (void)thread; + (void)old_c; + (void)new_c; + return Status::NotSupported(); +#else + Transaction* txn = nullptr; + WriteOptions wopts; + Status s = NewTxn(wopts, &txn); + if (!s.ok()) { + assert(!txn); + thread->stats.AddErrors(1); + return s; + } + + assert(txn); + + Iterator* it = nullptr; + long iterations = 0; + const Defer cleanup([&s, thread, &it, txn, this, &iterations]() { + delete it; + if (s.ok()) { + thread->stats.AddIterations(iterations); + thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); + thread->stats.AddSingleDeletes(1); + thread->stats.AddBytesForWrites( + /*nwrites=*/2, + Record::kPrimaryIndexEntrySize + Record::kSecondaryIndexEntrySize); + return; + } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || + s.IsMergeInProgress()) { + // ww-conflict detected, or + // lock cannot be acquired, or + // memtable history is not large enough for conflict checking, or + // Merge operation cannot be resolved. + // TODO (yanqin) add stats for other cases? + } else if (s.IsNotFound()) { + // ignore. + } else { + thread->stats.AddErrors(1); + } + RollbackTxn(txn).PermitUncheckedError(); + }); + + // TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take + // a snapshot here because we will later verify that point lookup in the + // primary index using GetForUpdate() returns the same value for 'c' as the + // iterator. The iterator does not need a snapshot though, because it will be + // assigned the current latest (published) sequence in the db, which will be + // no smaller than the snapshot created here. The GetForUpdate will perform + // ww conflict checking to ensure GetForUpdate() (using the snapshot) sees + // the same data as this iterator. + txn->SetSnapshot(); + std::string old_sk_prefix = Record::EncodeSecondaryKey(old_c); + std::string iter_ub_str = Record::EncodeSecondaryKey(old_c + 1); + Slice iter_ub = iter_ub_str; + ReadOptions ropts; + if (thread->rand.OneIn(2)) { + ropts.snapshot = txn->GetSnapshot(); + } + ropts.total_order_seek = true; + ropts.iterate_upper_bound = &iter_ub; + it = txn->GetIterator(ropts); + + assert(it); + it->Seek(old_sk_prefix); + if (!it->Valid()) { + s = Status::NotFound(); + return s; + } + auto* wb = txn->GetWriteBatch(); + assert(wb); + + do { + ++iterations; + Record record; + s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); + if (!s.ok()) { + VerificationAbort(thread->shared, "Cannot decode secondary key", s); + break; + } + // At this point, record.b is not known yet, thus we need to access + // primary index. + std::string pk = Record::EncodePrimaryKey(record.a_value()); + std::string value; + ReadOptions read_opts; + read_opts.snapshot = txn->GetSnapshot(); + s = txn->GetForUpdate(read_opts, pk, &value); + if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || + s.IsMergeInProgress()) { + // Write conflict, or cannot acquire lock, or memtable size is not large + // enough, or merge cannot be resolved. + break; + } else if (!s.ok()) { + // We can also fail verification here. + VerificationAbort(thread->shared, "pk should exist, but does not", s); + break; + } + auto result = Record::DecodePrimaryIndexValue(value); + s = std::get<0>(result); + if (!s.ok()) { + VerificationAbort(thread->shared, "Cannot decode primary index value", s); + break; + } + uint32_t b = std::get<1>(result); + uint32_t c = std::get<2>(result); + if (c != old_c) { + std::ostringstream oss; + oss << "c in primary index does not match secondary index: " << c + << " != " << old_c; + s = Status::Corruption(); + VerificationAbort(thread->shared, oss.str(), s); + break; + } + Record new_rec(record.a_value(), b, new_c); + std::string new_primary_index_value = new_rec.EncodePrimaryIndexValue(); + ColumnFamilyHandle* cf = db_->DefaultColumnFamily(); + s = txn->Put(cf, pk, new_primary_index_value, /*assume_tracked=*/true); + if (!s.ok()) { + break; + } + std::string old_sk = it->key().ToString(/*hex=*/false); + std::string new_sk; + std::string new_crc; + std::tie(new_sk, new_crc) = new_rec.EncodeSecondaryIndexEntry(); + s = wb->SingleDelete(old_sk); + if (!s.ok()) { + break; + } + s = wb->Put(new_sk, new_crc); + if (!s.ok()) { + break; + } + + it->Next(); + } while (it->Valid()); + + if (!s.ok()) { + return s; + } + + s = CommitTxn(txn); + + return s; +#endif // !ROCKSDB_LITE +} + +Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, + uint32_t a, + uint32_t b_delta) { +#ifdef ROCKSDB_LITE + (void)thread; + (void)a; + (void)b_delta; + return Status::NotSupported(); +#else + std::string pk_str = Record::EncodePrimaryKey(a); + Transaction* txn = nullptr; + WriteOptions wopts; + Status s = NewTxn(wopts, &txn); + if (!s.ok()) { + assert(!txn); + thread->stats.AddErrors(1); + return s; + } + + assert(txn); + + const Defer cleanup([&s, thread, txn, this]() { + if (s.ok()) { + thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); + thread->stats.AddBytesForWrites( + /*nwrites=*/1, /*nbytes=*/Record::kPrimaryIndexEntrySize); + return; + } + if (s.IsNotFound()) { + thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0); + } else if (s.IsInvalidArgument()) { + // ignored. + } else if (s.IsBusy() || s.IsTimedOut() || s.IsTryAgain() || + s.IsMergeInProgress()) { + // ignored. + } else { + thread->stats.AddErrors(1); + } + RollbackTxn(txn).PermitUncheckedError(); + }); + ReadOptions ropts; + std::string value; + s = txn->GetForUpdate(ropts, pk_str, &value); + if (!s.ok()) { + return s; + } + auto result = Record::DecodePrimaryIndexValue(value); + if (!std::get<0>(result).ok()) { + return s; + } + uint32_t b = std::get<1>(result) + b_delta; + uint32_t c = std::get<2>(result); + Record record(a, b, c); + std::string primary_index_value = record.EncodePrimaryIndexValue(); + ColumnFamilyHandle* cf = db_->DefaultColumnFamily(); + s = txn->Put(cf, pk_str, primary_index_value, /*assume_tracked=*/true); + if (!s.ok()) { + return s; + } + s = CommitTxn(txn); + return s; +#endif // !ROCKSDB_LITE +} + +Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, + ReadOptions ropts, uint32_t a) { +#ifdef ROCKSDB_LITE + (void)thread; + (void)ropts; + (void)a; + return Status::NotSupported(); +#else + std::string pk_str = Record::EncodePrimaryKey(a); + // pk may or may not exist + PinnableSlice value; + + Transaction* txn = nullptr; + WriteOptions wopts; + Status s = NewTxn(wopts, &txn); + if (!s.ok()) { + assert(!txn); + thread->stats.AddErrors(1); + return s; + } + + assert(txn); + + const Defer cleanup([&s, thread, txn, this]() { + if (s.ok()) { + thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/1); + return; + } else if (s.IsNotFound()) { + thread->stats.AddGets(/*ngets=*/1, /*nfounds=*/0); + } else { + thread->stats.AddErrors(1); + } + RollbackTxn(txn).PermitUncheckedError(); + }); + + s = txn->Get(ropts, db_->DefaultColumnFamily(), pk_str, &value); + if (s.ok()) { + s = txn->Commit(); + } + return s; +#endif // !ROCKSDB_LITE +} + +Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, + ReadOptions ropts, uint32_t c) { +#ifdef ROCKSDB_LITE + (void)thread; + (void)ropts; + (void)c; + return Status::NotSupported(); +#else + std::string sk = Record::EncodeSecondaryKey(c); + + Transaction* txn = nullptr; + WriteOptions wopts; + Status s = NewTxn(wopts, &txn); + if (!s.ok()) { + assert(!txn); + thread->stats.AddErrors(1); + return s; + } + + assert(txn); + + const Defer cleanup([&s, thread, txn, this]() { + if (s.ok()) { + thread->stats.AddIterations(1); + return; + } + thread->stats.AddErrors(1); + RollbackTxn(txn).PermitUncheckedError(); + }); + std::unique_ptr iter(txn->GetIterator(ropts)); + iter->Seek(sk); + if (iter->status().ok()) { + s = txn->Commit(); + } else { + s = iter->status(); + } + // TODO (yanqin) more Seek/SeekForPrev/Next/Prev/SeekToFirst/SeekToLast + return s; +#endif // !ROCKSDB_LITE +} + +void MultiOpsTxnsStressTest::VerifyDb(ThreadState* thread) const { + if (thread->shared->HasVerificationFailedYet()) { + return; + } + const Snapshot* const snapshot = db_->GetSnapshot(); + assert(snapshot); + ManagedSnapshot snapshot_guard(db_, snapshot); + + // TODO (yanqin) with a probability, we can use either forward or backward + // iterator in subsequent checks. We can also use more advanced features in + // range scan. For now, let's just use simple forward iteration with + // total_order_seek = true. + + // First, iterate primary index. + size_t primary_index_entries_count = 0; + { + char buf[4]; + EncodeFixed32(buf, Record::kPrimaryIndexId + 1); + std::reverse(buf, buf + sizeof(buf)); + std::string iter_ub_str(buf, sizeof(buf)); + Slice iter_ub = iter_ub_str; + + ReadOptions ropts; + ropts.snapshot = snapshot; + ropts.total_order_seek = true; + ropts.iterate_upper_bound = &iter_ub; + + std::unique_ptr it(db_->NewIterator(ropts)); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + ++primary_index_entries_count; + } + } + + // Second, iterate secondary index. + size_t secondary_index_entries_count = 0; + { + char buf[4]; + EncodeFixed32(buf, Record::kSecondaryIndexId); + std::reverse(buf, buf + sizeof(buf)); + const std::string start_key(buf, sizeof(buf)); + + ReadOptions ropts; + ropts.snapshot = snapshot; + ropts.total_order_seek = true; + + std::unique_ptr it(db_->NewIterator(ropts)); + for (it->Seek(start_key); it->Valid(); it->Next()) { + ++secondary_index_entries_count; + Record record; + Status s = record.DecodeSecondaryIndexEntry(it->key(), it->value()); + if (!s.ok()) { + VerificationAbort(thread->shared, "Cannot decode secondary index entry", + s); + return; + } + // After decoding secondary index entry, we know a and c. Crc is verified + // in decoding phase. + // + // Form a primary key and search in the primary index. + std::string pk = Record::EncodePrimaryKey(record.a_value()); + std::string value; + s = db_->Get(ropts, pk, &value); + if (!s.ok()) { + std::ostringstream oss; + oss << "Error searching pk " << Slice(pk).ToString(true) << ". " + << s.ToString(); + VerificationAbort(thread->shared, oss.str(), s); + return; + } + auto result = Record::DecodePrimaryIndexValue(value); + s = std::get<0>(result); + if (!s.ok()) { + std::ostringstream oss; + oss << "Error decoding primary index value " + << Slice(value).ToString(true) << ". " << s.ToString(); + VerificationAbort(thread->shared, oss.str(), s); + } + uint32_t c_in_primary = std::get<2>(result); + if (c_in_primary != record.c_value()) { + std::ostringstream oss; + oss << "Pk/sk mismatch. pk: (c=" << c_in_primary + << "), sk: (c=" << record.c_value() << ")"; + VerificationAbort(thread->shared, oss.str(), s); + } + } + } + + if (secondary_index_entries_count != primary_index_entries_count) { + std::ostringstream oss; + oss << "Pk/sk mismatch: primary index has " << primary_index_entries_count + << " entries. Secondary index has " << secondary_index_entries_count + << " entries."; + VerificationAbort(thread->shared, oss.str(), Status::OK()); + } +} + +uint32_t MultiOpsTxnsStressTest::ChooseA(ThreadState* thread) { + uint32_t rnd = thread->rand.Uniform(5); + uint32_t next_a_low = next_a_.load(std::memory_order_relaxed); + assert(next_a_low != 0); + if (rnd == 0) { + return next_a_low - 1; + } + + uint32_t result = 0; + result = thread->rand.Next() % next_a_low; + if (thread->rand.OneIn(3)) { + return result; + } + uint32_t next_a_high = next_a_.load(std::memory_order_relaxed); + // A higher chance that this a still exists. + return next_a_low + (next_a_high - next_a_low) / 2; +} + +uint32_t MultiOpsTxnsStressTest::GenerateNextA() { + return next_a_.fetch_add(1, std::memory_order_relaxed); +} + +void MultiOpsTxnsStressTest::PreloadDb(SharedState* shared, size_t num_c) { +#ifdef ROCKSDB_LITE + (void)shared; + (void)num_c; +#else + // TODO (yanqin) maybe parallelize. Currently execute in single thread. + WriteOptions wopts; + wopts.disableWAL = true; + wopts.sync = false; + Random rnd(shared->GetSeed()); + assert(txn_db_); + for (uint32_t c = 0; c < static_cast(num_c); ++c) { + for (uint32_t a = c * kInitialCARatio; a < ((c + 1) * kInitialCARatio); + ++a) { + Record record(a, /*_b=*/rnd.Next(), c); + WriteBatch wb; + const auto primary_index_entry = record.EncodePrimaryIndexEntry(); + Status s = wb.Put(primary_index_entry.first, primary_index_entry.second); + assert(s.ok()); + const auto secondary_index_entry = record.EncodeSecondaryIndexEntry(); + s = wb.Put(secondary_index_entry.first, secondary_index_entry.second); + assert(s.ok()); + s = txn_db_->Write(wopts, &wb); + assert(s.ok()); + + // TODO (yanqin): make the following check optional, especially when data + // size is large. + Record tmp_rec; + tmp_rec.SetB(record.b_value()); + s = tmp_rec.DecodeSecondaryIndexEntry(secondary_index_entry.first, + secondary_index_entry.second); + assert(s.ok()); + assert(tmp_rec == record); + } + } + Status s = db_->Flush(FlushOptions()); + assert(s.ok()); + next_a_.store(static_cast((num_c + 1) * kInitialCARatio)); + fprintf(stdout, "DB preloaded with %d entries\n", + static_cast(num_c * kInitialCARatio)); +#endif // !ROCKSDB_LITE +} + +StressTest* CreateMultiOpsTxnsStressTest() { + return new MultiOpsTxnsStressTest(); +} + +void CheckAndSetOptionsForMultiOpsTxnStressTest() { +#ifndef ROCKSDB_LITE + if (FLAGS_test_batches_snapshots || FLAGS_test_cf_consistency) { + fprintf(stderr, + "-test_multi_ops_txns is not compatible with " + "-test_bathces_snapshots and -test_cf_consistency\n"); + exit(1); + } + if (!FLAGS_use_txn) { + fprintf(stderr, "-use_txn must be true if -test_multi_ops_txns\n"); + exit(1); + } + if (FLAGS_clear_column_family_one_in > 0) { + fprintf(stderr, + "-test_multi_ops_txns is not compatible with clearing column " + "families\n"); + exit(1); + } + if (FLAGS_column_families > 1) { + // TODO (yanqin) support separating primary index and secondary index in + // different column families. + fprintf(stderr, + "-test_multi_ops_txns currently does not use more than one column " + "family\n"); + exit(1); + } + if (FLAGS_writepercent > 0 || FLAGS_delpercent > 0 || + FLAGS_delrangepercent > 0) { + fprintf(stderr, + "-test_multi_ops_txns requires that -writepercent, -delpercent and " + "-delrangepercent be 0\n"); + exit(1); + } +#else + fprintf(stderr, "-test_multi_ops_txns not supported in ROCKSDB_LITE mode\n"); + exit(1); +#endif // !ROCKSDB_LITE +} +} // namespace ROCKSDB_NAMESPACE + +#endif // GFLAGS diff --git a/db_stress_tool/multi_ops_txns_stress.h b/db_stress_tool/multi_ops_txns_stress.h new file mode 100644 index 000000000..ac74e3f8e --- /dev/null +++ b/db_stress_tool/multi_ops_txns_stress.h @@ -0,0 +1,302 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifdef GFLAGS +#include "db_stress_tool/db_stress_common.h" + +namespace ROCKSDB_NAMESPACE { + +// This file defines MultiOpsTxnsStress so that we can stress test RocksDB +// transactions on a simple, emulated relational table. +// +// The record format is similar to the example found at +// https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format. +// +// The table is created by +// ``` +// create table t1 ( +// a int primary key, +// b int, +// c int, +// key(c), +// ) +// ``` +// +// (For simplicity, we use uint32_t for int here.) +// +// For this table, there is a primary index using `a`, as well as a secondary +// index using `c` and `a`. +// +// Primary key format: +// | index id | M(a) | +// Primary index value: +// | b | c | +// M(a) represents the big-endian format of a. +// +// Secondary key format: +// | index id | M(c) | M(a) | +// Secondary index value: +// | crc32 | +// Similarly to M(a), M(c) is the big-endian format of c. +// +// The in-memory representation of a record is defined in class +// MultiOpsTxnsStress:Record that includes a number of helper methods to +// encode/decode primary index keys, primary index values, secondary index keys, +// secondary index values, etc. +// +// Sometimes primary index and secondary index reside on different column +// families, but sometimes they colocate in the same column family. Current +// implementation puts them in the same (default) column family, and this is +// subject to future change if we find it interesting to test the other case. +// +// Class MultiOpsTxnsStressTest has the following transactions for testing. +// +// 1. Primary key update +// UPDATE t1 SET a = 3 WHERE a = 2; +// ``` +// tx->GetForUpdate(primary key a=2) +// tx->GetForUpdate(primary key a=3) +// tx->Delete(primary key a=2) +// tx->Put(primary key a=3, value) +// tx->batch->SingleDelete(secondary key a=2) +// tx->batch->Put(secondary key a=3, value) +// tx->Prepare() +// Tx->Commit() +// ``` +// +// 2. Secondary key update +// UPDATE t1 SET c = 3 WHERE c = 2; +// ``` +// iter->Seek(secondary key) +// // Get corresponding primary key value(s) from iterator +// tx->GetForUpdate(primary key) +// tx->Put(primary key, value c=3) +// tx->batch->SingleDelete(secondary key c=2) +// tx->batch->Put(secondary key c=3) +// tx->Prepare() +// tx->Commit() +// ``` +// +// 3. Primary index value update +// UPDATE t1 SET b = b + 1 WHERE a = 2; +// ``` +// tx->GetForUpdate(primary key a=2) +// tx->Put(primary key a=2, value b=b+1) +// tx->Prepare() +// tx->Commit() +// ``` +// +// 4. Point lookup +// SELECT * FROM t1 WHERE a = 3; +// ``` +// tx->Get(primary key a=3) +// tx->Commit() +// ``` +// +// 5. Range scan +// SELECT * FROM t1 WHERE c = 2; +// ``` +// it = tx->GetIterator() +// it->Seek(secondary key c=2) +// tx->Commit() +// ``` + +class MultiOpsTxnsStressTest : public StressTest { + public: + class Record { + public: + static constexpr uint32_t kPrimaryIndexId = 1; + static constexpr uint32_t kSecondaryIndexId = 2; + + static constexpr size_t kPrimaryIndexEntrySize = 8 + 8; + static constexpr size_t kSecondaryIndexEntrySize = 12 + 4; + + static_assert(kPrimaryIndexId < kSecondaryIndexId, + "kPrimaryIndexId must be smaller than kSecondaryIndexId"); + + static_assert(sizeof(kPrimaryIndexId) == sizeof(uint32_t), + "kPrimaryIndexId must be 4 bytes"); + static_assert(sizeof(kSecondaryIndexId) == sizeof(uint32_t), + "kSecondaryIndexId must be 4 bytes"); + + // Used for generating search key to probe primary index. + static std::string EncodePrimaryKey(uint32_t a); + // Used for generating search prefix to probe secondary index. + static std::string EncodeSecondaryKey(uint32_t c); + // Used for generating search key to probe secondary index. + static std::string EncodeSecondaryKey(uint32_t c, uint32_t a); + + static std::tuple DecodePrimaryIndexValue( + Slice primary_index_value); + + static std::pair DecodeSecondaryIndexValue( + Slice secondary_index_value); + + Record() = default; + Record(uint32_t _a, uint32_t _b, uint32_t _c) : a_(_a), b_(_b), c_(_c) {} + + bool operator==(const Record& other) const { + return a_ == other.a_ && b_ == other.b_ && c_ == other.c_; + } + + bool operator!=(const Record& other) const { return !(*this == other); } + + std::pair EncodePrimaryIndexEntry() const; + + std::string EncodePrimaryKey() const; + + std::string EncodePrimaryIndexValue() const; + + std::pair EncodeSecondaryIndexEntry() const; + + std::string EncodeSecondaryKey() const; + + Status DecodePrimaryIndexEntry(Slice primary_index_key, + Slice primary_index_value); + + Status DecodeSecondaryIndexEntry(Slice secondary_index_key, + Slice secondary_index_value); + + uint32_t a_value() const { return a_; } + uint32_t b_value() const { return b_; } + uint32_t c_value() const { return c_; } + + void SetA(uint32_t _a) { a_ = _a; } + void SetB(uint32_t _b) { b_ = _b; } + void SetC(uint32_t _c) { c_ = _c; } + + std::string ToString() const { + std::string ret("("); + ret.append(std::to_string(a_)); + ret.append(","); + ret.append(std::to_string(b_)); + ret.append(","); + ret.append(std::to_string(c_)); + ret.append(")"); + return ret; + } + + private: + friend class InvariantChecker; + + uint32_t a_{0}; + uint32_t b_{0}; + uint32_t c_{0}; + }; + + MultiOpsTxnsStressTest() {} + + ~MultiOpsTxnsStressTest() override {} + + void FinishInitDb(SharedState*) override; + + void ReopenAndPreloadDb(SharedState* shared); + + bool IsStateTracked() const override { return false; } + + Status TestGet(ThreadState* thread, const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; + + std::vector TestMultiGet( + ThreadState* thread, const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; + + Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; + + // Given a key K, this creates an iterator which scans to K and then + // does a random sequence of Next/Prev operations. + Status TestIterate(ThreadState* thread, const ReadOptions& read_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; + + Status TestPut(ThreadState* thread, WriteOptions& write_opts, + const ReadOptions& read_opts, const std::vector& cf_ids, + const std::vector& keys, char (&value)[100], + std::unique_ptr& lock) override; + + Status TestDelete(ThreadState* thread, WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) override; + + Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) override; + + void TestIngestExternalFile(ThreadState* thread, + const std::vector& rand_column_families, + const std::vector& rand_keys, + std::unique_ptr& lock) override; + + void TestCompactRange(ThreadState* thread, int64_t rand_key, + const Slice& start_key, + ColumnFamilyHandle* column_family) override; + + Status TestBackupRestore(ThreadState* thread, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; + + Status TestCheckpoint(ThreadState* thread, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; + +#ifndef ROCKSDB_LITE + Status TestApproximateSize(ThreadState* thread, uint64_t iteration, + const std::vector& rand_column_families, + const std::vector& rand_keys) override; +#endif // !ROCKSDB_LITE + + Status TestCustomOperations( + ThreadState* thread, + const std::vector& rand_column_families) override; + + Status PrimaryKeyUpdateTxn(ThreadState* thread, uint32_t old_a, + uint32_t new_a); + + Status SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t old_c, + uint32_t new_c); + + Status UpdatePrimaryIndexValueTxn(ThreadState* thread, uint32_t a, + uint32_t b_delta); + + Status PointLookupTxn(ThreadState* thread, ReadOptions ropts, uint32_t a); + + Status RangeScanTxn(ThreadState* thread, ReadOptions ropts, uint32_t c); + + void VerifyDb(ThreadState* thread) const override; + + protected: + uint32_t ChooseA(ThreadState* thread); + + uint32_t GenerateNextA(); + + private: + void PreloadDb(SharedState* shared, size_t num_c); + + // TODO (yanqin) encapsulate the selection of keys a separate class. + std::atomic next_a_{0}; +}; + +class InvariantChecker { + public: + static_assert(sizeof(MultiOpsTxnsStressTest::Record().a_) == sizeof(uint32_t), + "MultiOpsTxnsStressTest::Record::a_ must be 4 bytes"); + static_assert(sizeof(MultiOpsTxnsStressTest::Record().b_) == sizeof(uint32_t), + "MultiOpsTxnsStressTest::Record::b_ must be 4 bytes"); + static_assert(sizeof(MultiOpsTxnsStressTest::Record().c_) == sizeof(uint32_t), + "MultiOpsTxnsStressTest::Record::c_ must be 4 bytes"); +}; + +} // namespace ROCKSDB_NAMESPACE +#endif // GFLAGS diff --git a/src.mk b/src.mk index f53a417ba..d9c400e97 100644 --- a/src.mk +++ b/src.mk @@ -354,6 +354,7 @@ STRESS_LIB_SOURCES = \ db_stress_tool/db_stress_tool.cc \ db_stress_tool/expected_state.cc \ db_stress_tool/no_batched_ops_stress.cc \ + db_stress_tool/multi_ops_txns_stress.cc \ TEST_LIB_SOURCES = \ db/db_test_util.cc \