From 9078fcccee8958c870b3a5b08d35929788847e39 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Fri, 30 Sep 2022 11:11:07 -0700 Subject: [PATCH] Add the PutEntity API to the stress/crash tests (#10760) Summary: The patch adds the `PutEntity` API to the non-batched, batched, and CF consistency stress tests. Namely, when the new `db_stress` command line parameter `use_put_entity_one_in` is greater than zero, one in N writes on average is performed using `PutEntity` rather than `Put`. The wide-column entity written has the generated value in its default column; in addition, it contains up to three additional columns where the original generated value is divided up between the column name and the column value (with the column name containing the first k characters of the generated value, and the column value containing the rest). Whether `PutEntity` is used (and if so, how many columns the entity has) is completely determined by the "value base" used to generate the value (that is, there is no randomness involved). Assuming the same `use_put_entity_one_in` setting is used across `db_stress` invocations, this enables us to reconstruct and validate the entity during subsequent `db_stress` runs. Note that `PutEntity` is currently incompatible with `Merge`, transactions, and user-defined timestamps; these combinations are currently disabled/disallowed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10760 Test Plan: Ran some batched, non-batched, and CF consistency stress tests using the script. Reviewed By: riversand963 Differential Revision: D39939032 Pulled By: ltamasi fbshipit-source-id: eafdf124e95993fb7d73158e3b006d11819f7fa9 --- db_stress_tool/batched_ops_stress.cc | 42 ++++++++------ db_stress_tool/cf_consistency_stress.cc | 36 ++++++++---- db_stress_tool/db_stress_common.cc | 34 ++++++++++++ db_stress_tool/db_stress_common.h | 5 ++ db_stress_tool/db_stress_gflags.cc | 4 ++ db_stress_tool/db_stress_test_base.cc | 23 ++++---- db_stress_tool/db_stress_tool.cc | 9 +++ db_stress_tool/expected_state.cc | 45 +++++++++++++++ db_stress_tool/no_batched_ops_stress.cc | 73 ++++++++++++++++--------- tools/db_crashtest.py | 14 +++++ 10 files changed, 221 insertions(+), 64 deletions(-) diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index ba55e9a1d..d0ca2d7aa 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -26,31 +26,41 @@ class BatchedOpsStressTest : public StressTest { const std::vector& rand_column_families, const std::vector& rand_keys, char (&value)[100]) override { - uint32_t value_base = + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + const std::string key_suffix = Key(rand_keys[0]); + + const uint32_t value_base = thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; - size_t sz = GenerateValue(value_base, value, sizeof(value)); - Slice v(value, sz); - std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; - std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; - Slice value_slices[10]; + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + const std::string value_suffix = Slice(value, sz).ToString(); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, FLAGS_batch_protection_bytes_per_key, FLAGS_user_timestamp_size); - Status s; - auto cfh = column_families_[rand_column_families[0]]; - std::string key_str = Key(rand_keys[0]); - for (int i = 0; i < 10; i++) { - keys[i] += key_str; - values[i] += v.ToString(); - value_slices[i] = values[i]; + + ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; + assert(cfh); + + for (int i = 9; i >= 0; --i) { + const std::string prefix = std::to_string(i); + + const std::string k = prefix + key_suffix; + const std::string v = prefix + value_suffix; + if (FLAGS_use_merge) { - batch.Merge(cfh, keys[i], value_slices[i]); + batch.Merge(cfh, k, v); + } else if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { + batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v)); } else { - batch.Put(cfh, keys[i], value_slices[i]); + batch.Put(cfh, k, v); } } - s = db_->Write(write_opts, &batch); + const Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index 05c298a61..d4ec84635 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -25,22 +25,36 @@ class CfConsistencyStressTest : public StressTest { const std::vector& rand_column_families, const std::vector& rand_keys, char (&value)[100]) override { - std::string key_str = Key(rand_keys[0]); - Slice key = key_str; - uint64_t value_base = batch_id_.fetch_add(1); - size_t sz = - GenerateValue(static_cast(value_base), value, sizeof(value)); - Slice v(value, sz); + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + const std::string k = Key(rand_keys[0]); + + const uint32_t value_base = batch_id_.fetch_add(1); + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + const Slice v(value, sz); + WriteBatch batch; + + const bool use_put_entity = !FLAGS_use_merge && + FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0; + for (auto cf : rand_column_families) { - ColumnFamilyHandle* cfh = column_families_[cf]; + ColumnFamilyHandle* const cfh = column_families_[cf]; + assert(cfh); + if (FLAGS_use_merge) { - batch.Merge(cfh, key, v); - } else { /* !FLAGS_use_merge */ - batch.Put(cfh, key, v); + batch.Merge(cfh, k, v); + } else if (use_put_entity) { + batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v)); + } else { + batch.Put(cfh, k, v); } } + Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); @@ -538,7 +552,7 @@ class CfConsistencyStressTest : public StressTest { } private: - std::atomic batch_id_; + std::atomic batch_id_; }; StressTest* CreateCfConsistencyStressTest() { diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 1b989de3a..241546021 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -239,6 +239,40 @@ uint32_t GetValueBase(Slice s) { return res; } +WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice) { + WideColumns columns; + + constexpr size_t max_columns = 4; + const size_t num_columns = (value_base % max_columns) + 1; + + columns.reserve(num_columns); + + assert(slice.size() >= num_columns); + + columns.emplace_back(kDefaultWideColumnName, slice); + + for (size_t i = 1; i < num_columns; ++i) { + const Slice name(slice.data(), i); + const Slice value(slice.data() + i, slice.size() - i); + + columns.emplace_back(name, value); + } + + return columns; +} + +WideColumns GenerateExpectedWideColumns(uint32_t value_base, + const Slice& slice) { + WideColumns columns = GenerateWideColumns(value_base, slice); + + std::sort(columns.begin(), columns.end(), + [](const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name().compare(rhs.name()) < 0; + }); + + return columns; +} + std::string GetNowNanos() { uint64_t t = db_stress_env->NowNanos(); std::string ret; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 9f8d78960..676d5f758 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -235,6 +235,7 @@ DECLARE_bool(in_place_update); DECLARE_string(memtablerep); DECLARE_int32(prefix_size); DECLARE_bool(use_merge); +DECLARE_uint32(use_put_entity_one_in); DECLARE_bool(use_full_merge_v1); DECLARE_int32(sync_wal_one_in); DECLARE_bool(avoid_unnecessary_blocking_io); @@ -620,6 +621,10 @@ extern std::vector GenerateNKeys(ThreadState* thread, int num_keys, extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz); extern uint32_t GetValueBase(Slice s); +extern WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice); +extern WideColumns GenerateExpectedWideColumns(uint32_t value_base, + const Slice& slice); + extern StressTest* CreateCfConsistencyStressTest(); extern StressTest* CreateBatchedOpsStressTest(); extern StressTest* CreateNonBatchedOpsStressTest(); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 3639895e4..e0a273320 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -858,6 +858,10 @@ DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge " "that behaves like a Put"); +DEFINE_uint32(use_put_entity_one_in, 0, + "If greater than zero, PutEntity will be used once per every N " + "write ops on average."); + DEFINE_bool(use_full_merge_v1, false, "On true, use a merge operator that implement the deprecated " "version of FullMerge"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index f04bc672a..55e61a6a7 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -457,11 +457,14 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, Status s; for (auto cfh : column_families_) { for (int64_t k = 0; k != number_of_keys; ++k) { - std::string key_str = Key(k); - Slice key = key_str; - size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value)); - Slice v(value, sz); - shared->Put(cf_idx, k, 0, true /* pending */); + const std::string key = Key(k); + + constexpr uint32_t value_base = 0; + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + + const Slice v(value, sz); + + shared->Put(cf_idx, k, value_base, true /* pending */); if (FLAGS_use_merge) { if (!FLAGS_use_txn) { @@ -478,13 +481,13 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, } #endif } + } else if (FLAGS_use_put_entity_one_in > 0) { + s = db_->PutEntity(write_opts, cfh, key, + GenerateWideColumns(value_base, v)); } else { if (!FLAGS_use_txn) { - std::string ts_str; - Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GetNowNanos(); - ts = ts_str; + const std::string ts = GetNowNanos(); s = db_->Put(write_opts, cfh, key, ts, v); } else { s = db_->Put(write_opts, cfh, key, v); @@ -503,7 +506,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, } } - shared->Put(cf_idx, k, 0, false /* pending */); + shared->Put(cf_idx, k, value_base, false /* pending */); if (!s.ok()) { break; } diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 6a85b7f30..f7b768aa5 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -294,6 +294,15 @@ int db_stress_tool(int argc, char** argv) { exit(1); } + if (FLAGS_use_put_entity_one_in > 0 && + (FLAGS_use_merge || FLAGS_use_full_merge_v1 || FLAGS_use_txn || + FLAGS_test_multi_ops_txns || FLAGS_user_timestamp_size > 0)) { + fprintf(stderr, + "PutEntity is currently incompatible with Merge, transactions, and " + "user-defined timestamps\n"); + exit(1); + } + #ifndef NDEBUG KillPoint* kp = KillPoint::GetInstance(); kp->rocksdb_kill_odds = FLAGS_kill_random_test; diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index c9720e59d..d08403b76 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -7,6 +7,7 @@ #include "db_stress_tool/expected_state.h" +#include "db/wide/wide_column_serialization.h" #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_shared_state.h" #include "rocksdb/trace_reader_writer.h" @@ -405,6 +406,50 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, return Status::OK(); } + Status PutEntityCF(uint32_t column_family_id, const Slice& key_with_ts, + const Slice& entity) override { + Slice key = + StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size); + + uint64_t key_id = 0; + if (!GetIntVal(key.ToString(), &key_id)) { + return Status::Corruption("Unable to parse key", key.ToString()); + } + + Slice entity_copy = entity; + WideColumns columns; + if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) { + return Status::Corruption("Unable to deserialize entity", + entity.ToString(/* hex */ true)); + } + + if (columns.empty() || columns[0].name() != kDefaultWideColumnName) { + return Status::Corruption("Cannot find default column in entity", + entity.ToString(/* hex */ true)); + } + + const Slice& value_of_default = columns[0].value(); + + const uint32_t value_base = GetValueBase(value_of_default); + + if (columns != GenerateExpectedWideColumns(value_base, value_of_default)) { + return Status::Corruption("Wide columns in entity inconsistent", + entity.ToString(/* hex */ true)); + } + + if (buffered_writes_) { + return WriteBatchInternal::PutEntity(buffered_writes_.get(), + column_family_id, key, columns); + } + + state_->Put(column_family_id, static_cast(key_id), value_base, + false /* pending */); + + ++num_write_ops_; + + return Status::OK(); + } + Status DeleteCF(uint32_t column_family_id, const Slice& key_with_ts) override { Slice key = diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index ff9ca38a8..004387944 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -57,10 +57,14 @@ class NonBatchedOpsStressTest : public StressTest { kNumberOfMethods }; - const int num_methods = + constexpr int num_methods = static_cast(VerificationMethod::kNumberOfMethods); + + // Note: Merge/GetMergeOperands is currently not supported for wide-column + // entities const VerificationMethod method = - static_cast(thread->rand.Uniform(num_methods)); + static_cast(thread->rand.Uniform( + FLAGS_use_put_entity_one_in > 0 ? num_methods - 1 : num_methods)); if (method == VerificationMethod::kIterator) { std::unique_ptr iter( @@ -88,13 +92,11 @@ class NonBatchedOpsStressTest : public StressTest { } Status s = iter->status(); - Slice iter_key; + std::string from_db; if (iter->Valid()) { - iter_key = iter->key(); - - const int diff = iter_key.compare(k); + const int diff = iter->key().compare(k); if (diff > 0) { s = Status::NotFound(); @@ -708,38 +710,44 @@ class NonBatchedOpsStressTest : public StressTest { const std::vector& rand_column_families, const std::vector& rand_keys, char (&value)[100]) override { + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + auto shared = thread->shared; - int64_t max_key = shared->GetMaxKey(); + assert(shared); + + const int64_t max_key = shared->GetMaxKey(); + int64_t rand_key = rand_keys[0]; int rand_column_family = rand_column_families[0]; - std::string write_ts_str; - Slice write_ts; + std::string write_ts; + std::unique_ptr lock( new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); while (!shared->AllowsOverwrite(rand_key) && (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) { lock.reset(); + rand_key = thread->rand.Next() % max_key; rand_column_family = thread->rand.Next() % FLAGS_column_families; + lock.reset( new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); if (FLAGS_user_timestamp_size > 0) { - write_ts_str = GetNowNanos(); - write_ts = write_ts_str; + write_ts = GetNowNanos(); } } - if (write_ts.size() == 0 && FLAGS_user_timestamp_size) { - write_ts_str = GetNowNanos(); - write_ts = write_ts_str; + + if (write_ts.empty() && FLAGS_user_timestamp_size) { + write_ts = GetNowNanos(); } - std::string key_str = Key(rand_key); - Slice key = key_str; - ColumnFamilyHandle* cfh = column_families_[rand_column_family]; + const std::string k = Key(rand_key); + + ColumnFamilyHandle* const cfh = column_families_[rand_column_family]; + assert(cfh); if (FLAGS_verify_before_write) { - std::string key_str2 = Key(rand_key); - Slice k = key_str2; std::string from_db; Status s = db_->Get(read_opts, cfh, k, &from_db); if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared, @@ -747,39 +755,47 @@ class NonBatchedOpsStressTest : public StressTest { return s; } } - uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; - size_t sz = GenerateValue(value_base, value, sizeof(value)); - Slice v(value, sz); + + const uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + const Slice v(value, sz); + shared->Put(rand_column_family, rand_key, value_base, true /* pending */); + Status s; + if (FLAGS_use_merge) { if (!FLAGS_use_txn) { - s = db_->Merge(write_opts, cfh, key, v); + s = db_->Merge(write_opts, cfh, k, v); } else { #ifndef ROCKSDB_LITE Transaction* txn; s = NewTxn(write_opts, &txn); if (s.ok()) { - s = txn->Merge(cfh, key, v); + s = txn->Merge(cfh, k, v); if (s.ok()) { s = CommitTxn(txn, thread); } } #endif } + } else if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { + s = db_->PutEntity(write_opts, cfh, k, + GenerateWideColumns(value_base, v)); } else { if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size == 0) { - s = db_->Put(write_opts, cfh, key, v); + s = db_->Put(write_opts, cfh, k, v); } else { - s = db_->Put(write_opts, cfh, key, write_ts, v); + s = db_->Put(write_opts, cfh, k, write_ts, v); } } else { #ifndef ROCKSDB_LITE Transaction* txn; s = NewTxn(write_opts, &txn); if (s.ok()) { - s = txn->Put(cfh, key, v); + s = txn->Put(cfh, k, v); if (s.ok()) { s = CommitTxn(txn, thread); } @@ -787,7 +803,9 @@ class NonBatchedOpsStressTest : public StressTest { #endif } } + shared->Put(rand_column_family, rand_key, value_base, false /* pending */); + if (!s.ok()) { if (FLAGS_injest_error_severity >= 2) { if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) { @@ -802,6 +820,7 @@ class NonBatchedOpsStressTest : public StressTest { std::terminate(); } } + thread->stats.AddBytesForWrites(1, sz); PrintKeyValue(rand_column_family, static_cast(rand_key), value, sz); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 953cabee9..5216c845a 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -124,6 +124,8 @@ default_params = { # fast_lru_cache is incompatible with stress tests, because it doesn't support strict_capacity_limit == false. "use_full_merge_v1": lambda: random.randint(0, 1), "use_merge": lambda: random.randint(0, 1), + # use_put_entity_one_in has to be the same across invocations for verification to work, hence no lambda + "use_put_entity_one_in": random.choice([0] * 7 + [1, 5, 10]), # 999 -> use Bloom API "ribbon_starting_level": lambda: random.choice([random.randint(-1, 10), 999]), "value_size_mult": 32, @@ -348,6 +350,8 @@ txn_params = { # pipeline write is not currnetly compatible with WritePrepared txns "enable_pipelined_write": 0, "create_timestamped_snapshot_one_in": random.choice([0, 20]), + # PutEntity in transactions is not yet implemented + "use_put_entity_one_in" : 0, } best_efforts_recovery_params = { @@ -392,6 +396,8 @@ ts_params = { "enable_blob_files": 0, "use_blob_db": 0, "ingest_external_file_one_in": 0, + # PutEntity with timestamps is not yet implemented + "use_put_entity_one_in" : 0, } tiered_params = { @@ -446,6 +452,8 @@ multiops_txn_default_params = { "enable_compaction_filter": 0, "create_timestamped_snapshot_one_in": 50, "sync_fault_injection": 0, + # PutEntity in transactions is not yet implemented + "use_put_entity_one_in" : 0, } multiops_wc_txn_params = { @@ -595,6 +603,12 @@ def finalize_and_sanitize(src_params): # compatible with only write committed policy if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0): dest_params["sync_fault_injection"] = 0 + + # PutEntity is currently not supported with Merge + if dest_params["use_put_entity_one_in"] != 0: + dest_params["use_merge"] = 0 + dest_params["use_full_merge_v1"] = 0 + return dest_params