Verify wide columns during prefix scan in stress tests (#10786)

Summary:
The patch adds checks to the
`{NonBatchedOps,BatchedOps,CfConsistency}StressTest::TestPrefixScan` methods
to make sure the wide columns exposed by the iterators are as expected (based on
the value base encoded into the iterator value). It also makes some code hygiene
improvements in these methods.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10786

Test Plan:
Ran some simple blackbox tests in the various modes (non-batched, batched,
CF consistency).

Reviewed By: riversand963

Differential Revision: D40163623

Pulled By: riversand963

fbshipit-source-id: 72f4c3b51063e48c15f974c4ec64d751d3ed0a83
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 943247b76e
commit 7af47c532b
  1. 117
      db_stress_tool/batched_ops_stress.cc
  2. 55
      db_stress_tool/cf_consistency_stress.cc
  3. 2
      db_stress_tool/db_stress_test_base.h
  4. 46
      db_stress_tool/no_batched_ops_stress.cc

@ -264,88 +264,117 @@ class BatchedOpsStressTest : public StressTest {
Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
size_t prefix_to_use =
(FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
auto cfh = column_families_[rand_column_families[0]];
std::string prefixes[10] = {"0", "1", "2", "3", "4",
"5", "6", "7", "8", "9"};
Slice prefix_slices[10];
ReadOptions readoptionscopy[10];
const Snapshot* snapshot = db_->GetSnapshot();
Iterator* iters[10];
std::string upper_bounds[10];
Slice ub_slices[10];
Status s = Status::OK();
for (int i = 0; i < 10; i++) {
prefixes[i] += key.ToString();
prefixes[i].resize(prefix_to_use);
prefix_slices[i] = Slice(prefixes[i]);
readoptionscopy[i] = readoptions;
readoptionscopy[i].snapshot = snapshot;
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
const std::string key = Key(rand_keys[0]);
assert(FLAGS_prefix_size > 0);
const size_t prefix_to_use = static_cast<size_t>(FLAGS_prefix_size);
constexpr size_t num_prefixes = 10;
std::array<std::string, num_prefixes> prefixes;
std::array<Slice, num_prefixes> prefix_slices;
std::array<ReadOptions, num_prefixes> ro_copies;
std::array<std::string, num_prefixes> upper_bounds;
std::array<Slice, num_prefixes> ub_slices;
std::array<std::unique_ptr<Iterator>, num_prefixes> iters;
const Snapshot* const snapshot = db_->GetSnapshot();
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
for (size_t i = 0; i < num_prefixes; ++i) {
prefixes[i] = std::to_string(i) + key;
prefix_slices[i] = Slice(prefixes[i].data(), prefix_to_use);
ro_copies[i] = readoptions;
ro_copies[i].snapshot = snapshot;
if (thread->rand.OneIn(2) &&
GetNextPrefix(prefix_slices[i], &(upper_bounds[i]))) {
// For half of the time, set the upper bound to the next prefix
ub_slices[i] = Slice(upper_bounds[i]);
readoptionscopy[i].iterate_upper_bound = &(ub_slices[i]);
ub_slices[i] = upper_bounds[i];
ro_copies[i].iterate_upper_bound = &(ub_slices[i]);
}
iters[i] = db_->NewIterator(readoptionscopy[i], cfh);
iters[i].reset(db_->NewIterator(ro_copies[i], cfh));
iters[i]->Seek(prefix_slices[i]);
}
long count = 0;
uint64_t count = 0;
while (iters[0]->Valid() && iters[0]->key().starts_with(prefix_slices[0])) {
count++;
std::string values[10];
++count;
std::array<std::string, num_prefixes> values;
// get list of all values for this iteration
for (int i = 0; i < 10; i++) {
for (size_t i = 0; i < num_prefixes; ++i) {
// no iterator should finish before the first one
assert(iters[i]->Valid() &&
iters[i]->key().starts_with(prefix_slices[i]));
values[i] = iters[i]->value().ToString();
char expected_first = (prefixes[i])[0];
char actual_first = (values[i])[0];
// make sure the first character of the value is the expected digit
const char expected_first = prefixes[i][0];
const char actual_first = values[i][0];
if (actual_first != expected_first) {
fprintf(stderr, "error expected first = %c actual = %c\n",
expected_first, actual_first);
}
(values[i])[0] = ' '; // blank out the differing character
}
// make sure all values are equivalent
for (int i = 0; i < 10; i++) {
values[i][0] = ' '; // blank out the differing character
// make sure all values are equivalent
if (values[i] != values[0]) {
fprintf(stderr,
"error : %d, inconsistent values for prefix %s: %s, %s\n", i,
prefixes[i].c_str(), StringToHex(values[0]).c_str(),
"error : %" ROCKSDB_PRIszt
", inconsistent values for prefix %s: %s, %s\n",
i, prefix_slices[i].ToString(/* hex */ true).c_str(),
StringToHex(values[0]).c_str(),
StringToHex(values[i]).c_str());
// we continue after error rather than exiting so that we can
// find more errors if any
}
// make sure value() and columns() are consistent
// note: in these tests, value base is stored after a single-digit
// prefix
Slice value_base_slice = iters[i]->value();
value_base_slice.remove_prefix(1);
const WideColumns expected_columns = GenerateExpectedWideColumns(
GetValueBase(value_base_slice), iters[i]->value());
if (iters[i]->columns() != expected_columns) {
fprintf(stderr,
"error : %" ROCKSDB_PRIszt
", value and columns inconsistent for prefix %s: %s\n",
i, prefix_slices[i].ToString(/* hex */ true).c_str(),
DebugString(iters[i]->value(), iters[i]->columns(),
expected_columns)
.c_str());
}
iters[i]->Next();
}
}
// cleanup iterators and snapshot
for (int i = 0; i < 10; i++) {
for (size_t i = 0; i < num_prefixes; ++i) {
// if the first iterator finished, they should have all finished
assert(!iters[i]->Valid() ||
!iters[i]->key().starts_with(prefix_slices[i]));
assert(iters[i]->status().ok());
delete iters[i];
}
db_->ReleaseSnapshot(snapshot);
if (s.ok()) {
thread->stats.AddPrefixes(1, count);
} else {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
}
thread->stats.AddPrefixes(1, count);
return s;
return Status::OK();
}
void VerifyDb(ThreadState* /* thread */) const override {}

@ -254,42 +254,69 @@ class CfConsistencyStressTest : public StressTest {
Status TestPrefixScan(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
size_t prefix_to_use =
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
const std::string key = Key(rand_keys[0]);
const size_t prefix_to_use =
(FLAGS_prefix_size < 0) ? 7 : static_cast<size_t>(FLAGS_prefix_size);
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), prefix_to_use);
const Slice prefix(key.data(), prefix_to_use);
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = readoptions;
// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
}
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
Iterator* iter = db_->NewIterator(ro_copy, cfh);
unsigned long count = 0;
ColumnFamilyHandle* const cfh =
column_families_[rand_column_families[thread->rand.Uniform(
static_cast<int>(rand_column_families.size()))]];
assert(cfh);
std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
uint64_t count = 0;
Status s;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
const WideColumns expected_columns = GenerateExpectedWideColumns(
GetValueBase(iter->value()), iter->value());
if (iter->columns() != expected_columns) {
s = Status::Corruption(
"Value and columns inconsistent",
DebugString(iter->value(), iter->columns(), expected_columns));
break;
}
}
assert(prefix_to_use == 0 ||
count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
Status s = iter->status();
if (s.ok()) {
thread->stats.AddPrefixes(1, count);
} else {
s = iter->status();
}
if (!s.ok()) {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
return s;
}
delete iter;
return s;
thread->stats.AddPrefixes(1, count);
return Status::OK();
}
ColumnFamilyHandle* GetControlCfh(ThreadState* thread,

@ -10,8 +10,6 @@
#ifdef GFLAGS
#pragma once
#include <ostream>
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_shared_state.h"

@ -671,14 +671,19 @@ class NonBatchedOpsStressTest : public StressTest {
Status TestPrefixScan(ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
auto cfh = column_families_[rand_column_families[0]];
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
const std::string key = Key(rand_keys[0]);
const Slice prefix(key.data(), FLAGS_prefix_size);
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = read_opts;
// Get the next prefix first and then see if we want to set upper bound.
// We'll use the next prefix in an assertion later on
if (GetNextPrefix(prefix, &upper_bound) && thread->rand.OneIn(2)) {
@ -692,26 +697,43 @@ class NonBatchedOpsStressTest : public StressTest {
MaybeUseOlderTimestampForRangeScan(thread, read_ts_str, read_ts_slice,
ro_copy);
Iterator* iter = db_->NewIterator(ro_copy, cfh);
unsigned long count = 0;
std::unique_ptr<Iterator> iter(db_->NewIterator(ro_copy, cfh));
uint64_t count = 0;
Status s;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
const WideColumns expected_columns = GenerateExpectedWideColumns(
GetValueBase(iter->value()), iter->value());
if (iter->columns() != expected_columns) {
s = Status::Corruption(
"Value and columns inconsistent",
DebugString(iter->value(), iter->columns(), expected_columns));
break;
}
}
if (ro_copy.iter_start_ts == nullptr) {
assert(count <= GetPrefixKeyCount(prefix.ToString(), upper_bound));
}
Status s = iter->status();
if (iter->status().ok()) {
thread->stats.AddPrefixes(1, count);
} else {
if (s.ok()) {
s = iter->status();
}
if (!s.ok()) {
fprintf(stderr, "TestPrefixScan error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
return s;
}
delete iter;
return s;
thread->stats.AddPrefixes(1, count);
return Status::OK();
}
Status TestPut(ThreadState* thread, WriteOptions& write_opts,

Loading…
Cancel
Save