From 7af47c532bc44823d03c1446e3eaec849f68e864 Mon Sep 17 00:00:00 2001 From: Levi Tamasi Date: Fri, 7 Oct 2022 11:17:57 -0700 Subject: [PATCH] Verify wide columns during prefix scan in stress tests (#10786) Summary: The patch adds checks to the `{NonBatchedOps,BatchedOps,CfConsistency}StressTest::TestPrefixScan` methods to make sure the wide columns exposed by the iterators are as expected (based on the value base encoded into the iterator value). It also makes some code hygiene improvements in these methods. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10786 Test Plan: Ran some simple blackbox tests in the various modes (non-batched, batched, CF consistency). Reviewed By: riversand963 Differential Revision: D40163623 Pulled By: riversand963 fbshipit-source-id: 72f4c3b51063e48c15f974c4ec64d751d3ed0a83 --- db_stress_tool/batched_ops_stress.cc | 117 +++++++++++++++--------- db_stress_tool/cf_consistency_stress.cc | 55 ++++++++--- db_stress_tool/db_stress_test_base.h | 2 - db_stress_tool/no_batched_ops_stress.cc | 46 +++++++--- 4 files changed, 148 insertions(+), 72 deletions(-) diff --git a/db_stress_tool/batched_ops_stress.cc b/db_stress_tool/batched_ops_stress.cc index d0ca2d7aa..b13bed533 100644 --- a/db_stress_tool/batched_ops_stress.cc +++ b/db_stress_tool/batched_ops_stress.cc @@ -264,88 +264,117 @@ class BatchedOpsStressTest : public StressTest { Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions, const std::vector& rand_column_families, const std::vector& rand_keys) override { - size_t prefix_to_use = - (FLAGS_prefix_size < 0) ? 7 : static_cast(FLAGS_prefix_size); - std::string key_str = Key(rand_keys[0]); - Slice key = key_str; - auto cfh = column_families_[rand_column_families[0]]; - std::string prefixes[10] = {"0", "1", "2", "3", "4", - "5", "6", "7", "8", "9"}; - Slice prefix_slices[10]; - ReadOptions readoptionscopy[10]; - const Snapshot* snapshot = db_->GetSnapshot(); - Iterator* iters[10]; - std::string upper_bounds[10]; - Slice ub_slices[10]; - Status s = Status::OK(); - for (int i = 0; i < 10; i++) { - prefixes[i] += key.ToString(); - prefixes[i].resize(prefix_to_use); - prefix_slices[i] = Slice(prefixes[i]); - readoptionscopy[i] = readoptions; - readoptionscopy[i].snapshot = snapshot; + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + const std::string key = Key(rand_keys[0]); + + assert(FLAGS_prefix_size > 0); + const size_t prefix_to_use = static_cast(FLAGS_prefix_size); + + constexpr size_t num_prefixes = 10; + + std::array prefixes; + std::array prefix_slices; + std::array ro_copies; + std::array upper_bounds; + std::array ub_slices; + std::array, num_prefixes> iters; + + const Snapshot* const snapshot = db_->GetSnapshot(); + + ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; + assert(cfh); + + for (size_t i = 0; i < num_prefixes; ++i) { + prefixes[i] = std::to_string(i) + key; + prefix_slices[i] = Slice(prefixes[i].data(), prefix_to_use); + + ro_copies[i] = readoptions; + ro_copies[i].snapshot = snapshot; if (thread->rand.OneIn(2) && GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) { // For half of the time, set the upper bound to the next prefix - ub_slices[i] = Slice(upper_bounds[i]); - readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]); + ub_slices[i] = upper_bounds[i]; + ro_copies[i].iterate_upper_bound = &(ub_slices[i]); } - iters[i] = db_->NewIterator(readoptionscopy[i], cfh); + + iters[i].reset(db_->NewIterator(ro_copies[i], cfh)); iters[i]->Seek(prefix_slices[i]); } - long count = 0; + uint64_t count = 0; + while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) { - count++; - std::string values[10]; + ++count; + + std::array values; + // get list of all values for this iteration - for (int i = 0; i < 10; i++) { + for (size_t i = 0; i < num_prefixes; ++i) { // no iterator should finish before the first one assert(iters[i]->Valid() && iters[i]->key().starts_with(prefix_slices[i])); values[i] = iters[i]->value().ToString(); - char expected_first = (prefixes[i])[0]; - char actual_first = (values[i])[0]; + // make sure the first character of the value is the expected digit + const char expected_first = prefixes[i][0]; + const char actual_first = values[i][0]; if (actual_first != expected_first) { fprintf(stderr, "error expected first = %c actual = %c\n", expected_first, actual_first); } - (values[i])[0] = ' '; // blank out the differing character - } - // make sure all values are equivalent - for (int i = 0; i < 10; i++) { + + values[i][0] = ' '; // blank out the differing character + + // make sure all values are equivalent if (values[i] != values[0]) { fprintf(stderr, - "error : %d, inconsistent values for prefix %s: %s, %s\n", i, - prefixes[i].c_str(), StringToHex(values[0]).c_str(), + "error : %" ROCKSDB_PRIszt + ", inconsistent values for prefix %s: %s, %s\n", + i, prefix_slices[i].ToString(/* hex */ true).c_str(), + StringToHex(values[0]).c_str(), StringToHex(values[i]).c_str()); // we continue after error rather than exiting so that we can // find more errors if any } + + // make sure value() and columns() are consistent + // note: in these tests, value base is stored after a single-digit + // prefix + Slice value_base_slice = iters[i]->value(); + value_base_slice.remove_prefix(1); + + const WideColumns expected_columns = GenerateExpectedWideColumns( + GetValueBase(value_base_slice), iters[i]->value()); + if (iters[i]->columns() != expected_columns) { + fprintf(stderr, + "error : %" ROCKSDB_PRIszt + ", value and columns inconsistent for prefix %s: %s\n", + i, prefix_slices[i].ToString(/* hex */ true).c_str(), + DebugString(iters[i]->value(), iters[i]->columns(), + expected_columns) + .c_str()); + } + iters[i]->Next(); } } // cleanup iterators and snapshot - for (int i = 0; i < 10; i++) { + for (size_t i = 0; i < num_prefixes; ++i) { // if the first iterator finished, they should have all finished assert(!iters[i]->Valid() || !iters[i]->key().starts_with(prefix_slices[i])); assert(iters[i]->status().ok()); - delete iters[i]; } + db_->ReleaseSnapshot(snapshot); - if (s.ok()) { - thread->stats.AddPrefixes(1, count); - } else { - fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); - thread->stats.AddErrors(1); - } + thread->stats.AddPrefixes(1, count); - return s; + return Status::OK(); } void VerifyDb(ThreadState* /* thread */) const override {} diff --git a/db_stress_tool/cf_consistency_stress.cc b/db_stress_tool/cf_consistency_stress.cc index d4ec84635..5fd4d7e5a 100644 --- a/db_stress_tool/cf_consistency_stress.cc +++ b/db_stress_tool/cf_consistency_stress.cc @@ -254,42 +254,69 @@ class CfConsistencyStressTest : public StressTest { Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions, const std::vector& rand_column_families, const std::vector& rand_keys) override { - size_t prefix_to_use = + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + const std::string key = Key(rand_keys[0]); + + const size_t prefix_to_use = (FLAGS_prefix_size < 0) ? 7 : static_cast(FLAGS_prefix_size); - std::string key_str = Key(rand_keys[0]); - Slice key = key_str; - Slice prefix = Slice(key.data(), prefix_to_use); + const Slice prefix(key.data(), prefix_to_use); std::string upper_bound; Slice ub_slice; + ReadOptions ro_copy = readoptions; + // Get the next prefix first and then see if we want to set upper bound. // We'll use the next prefix in an assertion later on if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) { ub_slice = Slice(upper_bound); ro_copy.iterate_upper_bound = &ub_slice; } - auto cfh = - column_families_[rand_column_families[thread->rand.Next() % - rand_column_families.size()]]; - Iterator* iter = db_->NewIterator(ro_copy, cfh); - unsigned long count = 0; + + ColumnFamilyHandle* const cfh = + column_families_[rand_column_families[thread->rand.Uniform( + static_cast(rand_column_families.size()))]]; + assert(cfh); + + std::unique_ptr iter(db_->NewIterator(ro_copy, cfh)); + + uint64_t count = 0; + Status s; + for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ++count; + + const WideColumns expected_columns = GenerateExpectedWideColumns( + GetValueBase(iter->value()), iter->value()); + if (iter->columns() != expected_columns) { + s = Status::Corruption( + "Value and columns inconsistent", + DebugString(iter->value(), iter->columns(), expected_columns)); + break; + } } + assert(prefix_to_use == 0 || count <= GetPrefixKeyCount(prefix.ToString(), upper_bound)); - Status s = iter->status(); + if (s.ok()) { - thread->stats.AddPrefixes(1, count); - } else { + s = iter->status(); + } + + if (!s.ok()) { fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); + + return s; } - delete iter; - return s; + + thread->stats.AddPrefixes(1, count); + + return Status::OK(); } ColumnFamilyHandle* GetControlCfh(ThreadState* thread, diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index 1f9737e39..81fbbe24b 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -10,8 +10,6 @@ #ifdef GFLAGS #pragma once -#include - #include "db_stress_tool/db_stress_common.h" #include "db_stress_tool/db_stress_shared_state.h" diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index 859cf51d4..147d6cdae 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -671,14 +671,19 @@ class NonBatchedOpsStressTest : public StressTest { Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts, const std::vector& rand_column_families, const std::vector& rand_keys) override { - auto cfh = column_families_[rand_column_families[0]]; - std::string key_str = Key(rand_keys[0]); - Slice key = key_str; - Slice prefix = Slice(key.data(), FLAGS_prefix_size); + assert(!rand_column_families.empty()); + assert(!rand_keys.empty()); + + ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]]; + assert(cfh); + + const std::string key = Key(rand_keys[0]); + const Slice prefix(key.data(), FLAGS_prefix_size); std::string upper_bound; Slice ub_slice; ReadOptions ro_copy = read_opts; + // Get the next prefix first and then see if we want to set upper bound. // We'll use the next prefix in an assertion later on if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) { @@ -692,26 +697,43 @@ class NonBatchedOpsStressTest : public StressTest { MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice, ro_copy); - Iterator* iter = db_->NewIterator(ro_copy, cfh); - unsigned long count = 0; + std::unique_ptr iter(db_->NewIterator(ro_copy, cfh)); + + uint64_t count = 0; + Status s; + for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) { ++count; + + const WideColumns expected_columns = GenerateExpectedWideColumns( + GetValueBase(iter->value()), iter->value()); + if (iter->columns() != expected_columns) { + s = Status::Corruption( + "Value and columns inconsistent", + DebugString(iter->value(), iter->columns(), expected_columns)); + break; + } } if (ro_copy.iter_start_ts == nullptr) { assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound)); } - Status s = iter->status(); - if (iter->status().ok()) { - thread->stats.AddPrefixes(1, count); - } else { + if (s.ok()) { + s = iter->status(); + } + + if (!s.ok()) { fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str()); thread->stats.AddErrors(1); + + return s; } - delete iter; - return s; + + thread->stats.AddPrefixes(1, count); + + return Status::OK(); } Status TestPut(ThreadState* thread, WriteOptions& write_opts,