diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index ba55e9a1d..d0ca2d7aa 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -26,31 +26,41 @@ class BatchedOpsStressTest : public StressTest { const std::vector& rand_column_families, const std::vector& rand_keys, char (&value)[100]) override { - uint32_t value_base = + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + const std::string key_suffix = Key(rand_keys[0]); + + const uint32_t value_base = thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL; - size_t sz = GenerateValue(value_base, value, sizeof(value)); - Slice v(value, sz); - std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; - std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"}; - Slice value_slices[10]; + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + const std::string value_suffix = Slice(value, sz).ToString(); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, FLAGS_batch_protection_bytes_per_key, FLAGS_user_timestamp_size); - Status s; - auto cfh = column_families_[rand_column_families[0]]; - std::string key_str = Key(rand_keys[0]); - for (int i = 0; i < 10; i++) { - keys[i] += key_str; - values[i] += v.ToString(); - value_slices[i] = values[i]; + + ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; + assert(cfh); + + for (int i = 9; i >= 0; --i) { + const std::string prefix = std::to_string(i); + + const std::string k = prefix + key_suffix; + const std::string v = prefix + value_suffix; + if (FLAGS_use_merge) { - batch.Merge(cfh, keys[i], value_slices[i]); + batch.Merge(cfh, k, v); + } else if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { + batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v)); } else { - batch.Put(cfh, keys[i], value_slices[i]); + batch.Put(cfh, k, v); } } - s = db_->Write(write_opts, &batch); + const Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { fprintf(stderr, "multiput error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index 05c298a61..d4ec84635 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -25,22 +25,36 @@ class CfConsistencyStressTest : public StressTest { const std::vector& rand_column_families, const std::vector& rand_keys, char (&value)[100]) override { - std::string key_str = Key(rand_keys[0]); - Slice key = key_str; - uint64_t value_base = batch_id_.fetch_add(1); - size_t sz = - GenerateValue(static_cast(value_base), value, sizeof(value)); - Slice v(value, sz); + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + const std::string k = Key(rand_keys[0]); + + const uint32_t value_base = batch_id_.fetch_add(1); + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + const Slice v(value, sz); + WriteBatch batch; + + const bool use_put_entity = !FLAGS_use_merge && + FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0; + for (auto cf : rand_column_families) { - ColumnFamilyHandle* cfh = column_families_[cf]; + ColumnFamilyHandle* const cfh = column_families_[cf]; + assert(cfh); + if (FLAGS_use_merge) { - batch.Merge(cfh, key, v); - } else { /* !FLAGS_use_merge */ - batch.Put(cfh, key, v); + batch.Merge(cfh, k, v); + } else if (use_put_entity) { + batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v)); + } else { + batch.Put(cfh, k, v); } } + Status s = db_->Write(write_opts, &batch); + if (!s.ok()) { fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); @@ -538,7 +552,7 @@ class CfConsistencyStressTest : public StressTest { } private: - std::atomic batch_id_; + std::atomic batch_id_; }; StressTest* CreateCfConsistencyStressTest() { diff --git a/db_stress_tool/db_stress_common.cc b/db_stress_tool/db_stress_common.cc index 1b989de3a..241546021 100644 --- a/db_stress_tool/db_stress_common.cc +++ b/db_stress_tool/db_stress_common.cc @@ -239,6 +239,40 @@ uint32_t GetValueBase(Slice s) { return res; } +WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice) { + WideColumns columns; + + constexpr size_t max_columns = 4; + const size_t num_columns = (value_base % max_columns) + 1; + + columns.reserve(num_columns); + + assert(slice.size() >= num_columns); + + columns.emplace_back(kDefaultWideColumnName, slice); + + for (size_t i = 1; i < num_columns; ++i) { + const Slice name(slice.data(), i); + const Slice value(slice.data() + i, slice.size() - i); + + columns.emplace_back(name, value); + } + + return columns; +} + +WideColumns GenerateExpectedWideColumns(uint32_t value_base, + const Slice& slice) { + WideColumns columns = GenerateWideColumns(value_base, slice); + + std::sort(columns.begin(), columns.end(), + [](const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name().compare(rhs.name()) < 0; + }); + + return columns; +} + std::string GetNowNanos() { uint64_t t = db_stress_env->NowNanos(); std::string ret; diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 9f8d78960..676d5f758 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -235,6 +235,7 @@ DECLARE_bool(in_place_update); DECLARE_string(memtablerep); DECLARE_int32(prefix_size); DECLARE_bool(use_merge); +DECLARE_uint32(use_put_entity_one_in); DECLARE_bool(use_full_merge_v1); DECLARE_int32(sync_wal_one_in); DECLARE_bool(avoid_unnecessary_blocking_io); @@ -620,6 +621,10 @@ extern std::vector GenerateNKeys(ThreadState* thread, int num_keys, extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz); extern uint32_t GetValueBase(Slice s); +extern WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice); +extern WideColumns GenerateExpectedWideColumns(uint32_t value_base, + const Slice& slice); + extern StressTest* CreateCfConsistencyStressTest(); extern StressTest* CreateBatchedOpsStressTest(); extern StressTest* CreateNonBatchedOpsStressTest(); diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 3639895e4..e0a273320 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -858,6 +858,10 @@ DEFINE_bool(use_merge, false, "On true, replaces all writes with a Merge " "that behaves like a Put"); +DEFINE_uint32(use_put_entity_one_in, 0, + "If greater than zero, PutEntity will be used once per every N " + "write ops on average."); + DEFINE_bool(use_full_merge_v1, false, "On true, use a merge operator that implement the deprecated " "version of FullMerge"); diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index f04bc672a..55e61a6a7 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -457,11 +457,14 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, Status s; for (auto cfh : column_families_) { for (int64_t k = 0; k != number_of_keys; ++k) { - std::string key_str = Key(k); - Slice key = key_str; - size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value)); - Slice v(value, sz); - shared->Put(cf_idx, k, 0, true /* pending */); + const std::string key = Key(k); + + constexpr uint32_t value_base = 0; + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + + const Slice v(value, sz); + + shared->Put(cf_idx, k, value_base, true /* pending */); if (FLAGS_use_merge) { if (!FLAGS_use_txn) { @@ -478,13 +481,13 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, } #endif } + } else if (FLAGS_use_put_entity_one_in > 0) { + s = db_->PutEntity(write_opts, cfh, key, + GenerateWideColumns(value_base, v)); } else { if (!FLAGS_use_txn) { - std::string ts_str; - Slice ts; if (FLAGS_user_timestamp_size > 0) { - ts_str = GetNowNanos(); - ts = ts_str; + const std::string ts = GetNowNanos(); s = db_->Put(write_opts, cfh, key, ts, v); } else { s = db_->Put(write_opts, cfh, key, v); @@ -503,7 +506,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys, } } - shared->Put(cf_idx, k, 0, false /* pending */); + shared->Put(cf_idx, k, value_base, false /* pending */); if (!s.ok()) { break; } diff --git a/db_stress_tool/db_stress_tool.cc b/db_stress_tool/db_stress_tool.cc index 6a85b7f30..f7b768aa5 100644 --- a/db_stress_tool/db_stress_tool.cc +++ b/db_stress_tool/db_stress_tool.cc @@ -294,6 +294,15 @@ int db_stress_tool(int argc, char** argv) { exit(1); } + if (FLAGS_use_put_entity_one_in > 0 && + (FLAGS_use_merge || FLAGS_use_full_merge_v1 || FLAGS_use_txn || + FLAGS_test_multi_ops_txns || FLAGS_user_timestamp_size > 0)) { + fprintf(stderr, + "PutEntity is currently incompatible with Merge, transactions, and " + "user-defined timestamps\n"); + exit(1); + } + #ifndef NDEBUG KillPoint* kp = KillPoint::GetInstance(); kp->rocksdb_kill_odds = FLAGS_kill_random_test; diff --git a/db_stress_tool/expected_state.cc b/db_stress_tool/expected_state.cc index c9720e59d..d08403b76 100644 --- a/db_stress_tool/expected_state.cc +++ b/db_stress_tool/expected_state.cc @@ -7,6 +7,7 @@ #include "db_stress_tool/expected_state.h" +#include "db/wide/wide_column_serialization.h" #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_shared_state.h" #include "rocksdb/trace_reader_writer.h" @@ -405,6 +406,50 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler, return Status::OK(); } + Status PutEntityCF(uint32_t column_family_id, const Slice& key_with_ts, + const Slice& entity) override { + Slice key = + StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size); + + uint64_t key_id = 0; + if (!GetIntVal(key.ToString(), &key_id)) { + return Status::Corruption("Unable to parse key", key.ToString()); + } + + Slice entity_copy = entity; + WideColumns columns; + if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) { + return Status::Corruption("Unable to deserialize entity", + entity.ToString(/* hex */ true)); + } + + if (columns.empty() || columns[0].name() != kDefaultWideColumnName) { + return Status::Corruption("Cannot find default column in entity", + entity.ToString(/* hex */ true)); + } + + const Slice& value_of_default = columns[0].value(); + + const uint32_t value_base = GetValueBase(value_of_default); + + if (columns != GenerateExpectedWideColumns(value_base, value_of_default)) { + return Status::Corruption("Wide columns in entity inconsistent", + entity.ToString(/* hex */ true)); + } + + if (buffered_writes_) { + return WriteBatchInternal::PutEntity(buffered_writes_.get(), + column_family_id, key, columns); + } + + state_->Put(column_family_id, static_cast(key_id), value_base, + false /* pending */); + + ++num_write_ops_; + + return Status::OK(); + } + Status DeleteCF(uint32_t column_family_id, const Slice& key_with_ts) override { Slice key = diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index ff9ca38a8..004387944 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -57,10 +57,14 @@ class NonBatchedOpsStressTest : public StressTest { kNumberOfMethods }; - const int num_methods = + constexpr int num_methods = static_cast(VerificationMethod::kNumberOfMethods); + + // Note: Merge/GetMergeOperands is currently not supported for wide-column + // entities const VerificationMethod method = - static_cast(thread->rand.Uniform(num_methods)); + static_cast(thread->rand.Uniform( + FLAGS_use_put_entity_one_in > 0 ? num_methods - 1 : num_methods)); if (method == VerificationMethod::kIterator) { std::unique_ptr iter( @@ -88,13 +92,11 @@ class NonBatchedOpsStressTest : public StressTest { } Status s = iter->status(); - Slice iter_key; + std::string from_db; if (iter->Valid()) { - iter_key = iter->key(); - - const int diff = iter_key.compare(k); + const int diff = iter->key().compare(k); if (diff > 0) { s = Status::NotFound(); @@ -708,38 +710,44 @@ class NonBatchedOpsStressTest : public StressTest { const std::vector& rand_column_families, const std::vector& rand_keys, char (&value)[100]) override { + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + auto shared = thread->shared; - int64_t max_key = shared->GetMaxKey(); + assert(shared); + + const int64_t max_key = shared->GetMaxKey(); + int64_t rand_key = rand_keys[0]; int rand_column_family = rand_column_families[0]; - std::string write_ts_str; - Slice write_ts; + std::string write_ts; + std::unique_ptr lock( new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); while (!shared->AllowsOverwrite(rand_key) && (FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) { lock.reset(); + rand_key = thread->rand.Next() % max_key; rand_column_family = thread->rand.Next() % FLAGS_column_families; + lock.reset( new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key))); if (FLAGS_user_timestamp_size > 0) { - write_ts_str = GetNowNanos(); - write_ts = write_ts_str; + write_ts = GetNowNanos(); } } - if (write_ts.size() == 0 && FLAGS_user_timestamp_size) { - write_ts_str = GetNowNanos(); - write_ts = write_ts_str; + + if (write_ts.empty() && FLAGS_user_timestamp_size) { + write_ts = GetNowNanos(); } - std::string key_str = Key(rand_key); - Slice key = key_str; - ColumnFamilyHandle* cfh = column_families_[rand_column_family]; + const std::string k = Key(rand_key); + + ColumnFamilyHandle* const cfh = column_families_[rand_column_family]; + assert(cfh); if (FLAGS_verify_before_write) { - std::string key_str2 = Key(rand_key); - Slice k = key_str2; std::string from_db; Status s = db_->Get(read_opts, cfh, k, &from_db); if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared, @@ -747,39 +755,47 @@ class NonBatchedOpsStressTest : public StressTest { return s; } } - uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; - size_t sz = GenerateValue(value_base, value, sizeof(value)); - Slice v(value, sz); + + const uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + const size_t sz = GenerateValue(value_base, value, sizeof(value)); + const Slice v(value, sz); + shared->Put(rand_column_family, rand_key, value_base, true /* pending */); + Status s; + if (FLAGS_use_merge) { if (!FLAGS_use_txn) { - s = db_->Merge(write_opts, cfh, key, v); + s = db_->Merge(write_opts, cfh, k, v); } else { #ifndef ROCKSDB_LITE Transaction* txn; s = NewTxn(write_opts, &txn); if (s.ok()) { - s = txn->Merge(cfh, key, v); + s = txn->Merge(cfh, k, v); if (s.ok()) { s = CommitTxn(txn, thread); } } #endif } + } else if (FLAGS_use_put_entity_one_in > 0 && + (value_base % FLAGS_use_put_entity_one_in) == 0) { + s = db_->PutEntity(write_opts, cfh, k, + GenerateWideColumns(value_base, v)); } else { if (!FLAGS_use_txn) { if (FLAGS_user_timestamp_size == 0) { - s = db_->Put(write_opts, cfh, key, v); + s = db_->Put(write_opts, cfh, k, v); } else { - s = db_->Put(write_opts, cfh, key, write_ts, v); + s = db_->Put(write_opts, cfh, k, write_ts, v); } } else { #ifndef ROCKSDB_LITE Transaction* txn; s = NewTxn(write_opts, &txn); if (s.ok()) { - s = txn->Put(cfh, key, v); + s = txn->Put(cfh, k, v); if (s.ok()) { s = CommitTxn(txn, thread); } @@ -787,7 +803,9 @@ class NonBatchedOpsStressTest : public StressTest { #endif } } + shared->Put(rand_column_family, rand_key, value_base, false /* pending */); + if (!s.ok()) { if (FLAGS_injest_error_severity >= 2) { if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) { @@ -802,6 +820,7 @@ class NonBatchedOpsStressTest : public StressTest { std::terminate(); } } + thread->stats.AddBytesForWrites(1, sz); PrintKeyValue(rand_column_family, static_cast(rand_key), value, sz); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 953cabee9..5216c845a 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -124,6 +124,8 @@ default_params = { # fast_lru_cache is incompatible with stress tests, because it doesn't support strict_capacity_limit == false. "use_full_merge_v1": lambda: random.randint(0, 1), "use_merge": lambda: random.randint(0, 1), + # use_put_entity_one_in has to be the same across invocations for verification to work, hence no lambda + "use_put_entity_one_in": random.choice([0] * 7 + [1, 5, 10]), # 999 -> use Bloom API "ribbon_starting_level": lambda: random.choice([random.randint(-1, 10), 999]), "value_size_mult": 32, @@ -348,6 +350,8 @@ txn_params = { # pipeline write is not currnetly compatible with WritePrepared txns "enable_pipelined_write": 0, "create_timestamped_snapshot_one_in": random.choice([0, 20]), + # PutEntity in transactions is not yet implemented + "use_put_entity_one_in" : 0, } best_efforts_recovery_params = { @@ -392,6 +396,8 @@ ts_params = { "enable_blob_files": 0, "use_blob_db": 0, "ingest_external_file_one_in": 0, + # PutEntity with timestamps is not yet implemented + "use_put_entity_one_in" : 0, } tiered_params = { @@ -446,6 +452,8 @@ multiops_txn_default_params = { "enable_compaction_filter": 0, "create_timestamped_snapshot_one_in": 50, "sync_fault_injection": 0, + # PutEntity in transactions is not yet implemented + "use_put_entity_one_in" : 0, } multiops_wc_txn_params = { @@ -595,6 +603,12 @@ def finalize_and_sanitize(src_params): # compatible with only write committed policy if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0): dest_params["sync_fault_injection"] = 0 + + # PutEntity is currently not supported with Merge + if dest_params["use_put_entity_one_in"] != 0: + dest_params["use_merge"] = 0 + dest_params["use_full_merge_v1"] = 0 + return dest_params