Check wide columns in TestIterateAgainstExpected (#10820)

Summary: Pull Request resolved: https://github.com/facebook/rocksdb/pull/10820

Reviewed By: riversand963

Differential Revision: D40363653

Pulled By: ltamasi

fbshipit-source-id: d347547d8cdd3f8926b35b6af4d1fa0f827e4a10
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 3cd78bce1e
commit 2f3042d732
  1. 26
      db_stress_tool/db_stress_shared_state.h
  2. 137
      db_stress_tool/no_batched_ops_stress.cc

@ -215,6 +215,32 @@ class SharedState {
}
}
// Returns a collection of mutex locks covering the key range [start, end) in
// `cf`.
std::vector<std::unique_ptr<MutexLock>> GetLocksForKeyRange(int cf,
int64_t start,
int64_t end) {
std::vector<std::unique_ptr<MutexLock>> range_locks;
if (start >= end) {
return range_locks;
}
const int64_t start_idx = start >> log2_keys_per_lock_;
int64_t end_idx = end >> log2_keys_per_lock_;
if ((end & ((1 << log2_keys_per_lock_) - 1)) == 0) {
--end_idx;
}
for (int64_t idx = start_idx; idx <= end_idx; ++idx) {
range_locks.emplace_back(
std::make_unique<MutexLock>(&key_locks_[cf][idx]));
}
return range_locks;
}
Status SaveAtAndAfter(DB* db) {
return expected_state_manager_->SaveAtAndAfter(db);
}

@ -1127,33 +1127,41 @@ class NonBatchedOpsStressTest : public StressTest {
ThreadState* thread, const ReadOptions& read_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) override {
// Lock the whole range over which we might iterate to ensure it doesn't
// change under us.
std::vector<std::unique_ptr<MutexLock>> range_locks;
int64_t lb = rand_keys[0];
int rand_column_family = rand_column_families[0];
assert(thread);
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
auto shared = thread->shared;
assert(shared);
int64_t max_key = shared->GetMaxKey();
if (static_cast<uint64_t>(lb) > max_key - FLAGS_num_iterations) {
lb = thread->rand.Next() % (max_key - FLAGS_num_iterations + 1);
}
for (int j = 0; j < static_cast<int>(FLAGS_num_iterations); ++j) {
if (j == 0 || ((lb + j) & ((1 << FLAGS_log2_keys_per_lock) - 1)) == 0) {
range_locks.emplace_back(
new MutexLock(shared->GetMutexForKey(rand_column_family, lb + j)));
}
const int64_t num_iter = static_cast<int64_t>(FLAGS_num_iterations);
int64_t lb = rand_keys[0];
if (lb > max_key - num_iter) {
lb = thread->rand.Next() % (max_key - num_iter + 1);
}
int64_t ub = lb + FLAGS_num_iterations;
// Locks acquired for [lb, ub)
ReadOptions readoptscopy(read_opts);
const int64_t ub = lb + num_iter;
// Lock the whole range over which we might iterate to ensure it doesn't
// change under us.
const int rand_column_family = rand_column_families[0];
std::vector<std::unique_ptr<MutexLock>> range_locks =
shared->GetLocksForKeyRange(rand_column_family, lb, ub);
ReadOptions ro(read_opts);
ro.total_order_seek = true;
std::string read_ts_str;
Slice read_ts;
if (FLAGS_user_timestamp_size > 0) {
read_ts_str = GetNowNanos();
read_ts = read_ts_str;
readoptscopy.timestamp = &read_ts;
ro.timestamp = &read_ts;
}
readoptscopy.total_order_seek = true;
std::string max_key_str;
Slice max_key_slice;
if (!FLAGS_destroy_db_initially) {
@ -1162,11 +1170,41 @@ class NonBatchedOpsStressTest : public StressTest {
// to restrict iterator from reading keys written in batched_op_stress
// that do not have expected state updated and may not be parseable by
// GetIntVal().
readoptscopy.iterate_upper_bound = &max_key_slice;
ro.iterate_upper_bound = &max_key_slice;
}
auto cfh = column_families_[rand_column_family];
ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
assert(cfh);
std::unique_ptr<Iterator> iter(db_->NewIterator(ro, cfh));
std::string op_logs;
std::unique_ptr<Iterator> iter(db_->NewIterator(readoptscopy, cfh));
auto check_columns = [&]() {
assert(iter);
assert(iter->Valid());
const WideColumns expected_columns = GenerateExpectedWideColumns(
GetValueBase(iter->value()), iter->value());
if (iter->columns() != expected_columns) {
shared->SetVerificationFailure();
fprintf(stderr,
"Verification failed for key %s: "
"Value and columns inconsistent: %s\n",
Slice(iter->key()).ToString(/* hex */ true).c_str(),
DebugString(iter->value(), iter->columns(), expected_columns)
.c_str());
fprintf(stderr, "Column family: %s, op_logs: %s\n",
cfh->GetName().c_str(), op_logs.c_str());
thread->stats.AddErrors(1);
return false;
}
return true;
};
auto check_no_key_in_range = [&](int64_t start, int64_t end) {
for (auto j = std::max(start, lb); j < std::min(end, ub); ++j) {
@ -1198,10 +1236,13 @@ class NonBatchedOpsStressTest : public StressTest {
// The random sequence Next and Prev test below tends to be very short
// ranged.
int64_t last_key = lb - 1;
std::string key_str = Key(lb);
iter->Seek(Slice(key_str));
iter->Seek(key_str);
op_logs += "S " + Slice(key_str).ToString(true) + " ";
uint64_t curr;
uint64_t curr = 0;
while (true) {
if (!iter->Valid()) {
if (!iter->status().ok()) {
@ -1213,29 +1254,38 @@ class NonBatchedOpsStressTest : public StressTest {
thread->stats.AddErrors(1);
return iter->status();
}
if (!check_no_key_in_range(last_key + 1, static_cast<int64_t>(ub))) {
// error reported in check_no_key_in_range()
if (!check_no_key_in_range(last_key + 1, ub)) {
return Status::OK();
}
break;
}
if (!check_columns()) {
return Status::OK();
}
// iter is valid, the range (last_key, current key) was skipped
GetIntVal(iter->key().ToString(), &curr);
if (!check_no_key_in_range(last_key + 1, static_cast<int64_t>(curr))) {
return Status::OK();
}
last_key = static_cast<int64_t>(curr);
if (last_key >= ub - 1) {
break;
}
iter->Next();
op_logs += "N";
}
// backward scan
key_str = Key(ub - 1);
iter->SeekForPrev(Slice(key_str));
iter->SeekForPrev(key_str);
op_logs += " SFP " + Slice(key_str).ToString(true) + " ";
last_key = ub;
while (true) {
if (!iter->Valid()) {
@ -1253,16 +1303,24 @@ class NonBatchedOpsStressTest : public StressTest {
}
break;
}
if (!check_columns()) {
return Status::OK();
}
// the range (current key, last key) was skipped
GetIntVal(iter->key().ToString(), &curr);
if (!check_no_key_in_range(static_cast<int64_t>(curr + 1), last_key)) {
return Status::OK();
}
last_key = static_cast<int64_t>(curr);
if (last_key <= lb) {
break;
}
iter->Prev();
op_logs += "P";
}
@ -1271,11 +1329,14 @@ class NonBatchedOpsStressTest : public StressTest {
// change. It is safe to refresh since the testing key range is locked.
iter->Refresh();
}
// start from middle of [lb, ub) otherwise it is easy to iterate out of
// locked range
int64_t mid = lb + static_cast<int64_t>(FLAGS_num_iterations / 2);
const int64_t mid = lb + num_iter / 2;
key_str = Key(mid);
Slice key = key_str;
const Slice key(key_str);
if (thread->rand.OneIn(2)) {
iter->Seek(key);
op_logs += " S " + key.ToString(true) + " ";
@ -1295,16 +1356,20 @@ class NonBatchedOpsStressTest : public StressTest {
}
}
for (uint64_t i = 0; i < FLAGS_num_iterations && iter->Valid(); i++) {
for (int64_t i = 0; i < num_iter && iter->Valid(); ++i) {
if (!check_columns()) {
return Status::OK();
}
GetIntVal(iter->key().ToString(), &curr);
if (curr < static_cast<uint64_t>(lb)) {
if (static_cast<int64_t>(curr) < lb) {
iter->Next();
op_logs += "N";
} else if (curr >= static_cast<uint64_t>(ub)) {
} else if (static_cast<int64_t>(curr) >= ub) {
iter->Prev();
op_logs += "P";
} else {
uint32_t expected_value =
const uint32_t expected_value =
shared->Get(rand_column_family, static_cast<int64_t>(curr));
if (expected_value == shared->DELETION_SENTINEL) {
// Fail fast to preserve the DB state.
@ -1316,13 +1381,14 @@ class NonBatchedOpsStressTest : public StressTest {
thread->stats.AddErrors(1);
break;
}
if (thread->rand.OneIn(2)) {
iter->Next();
op_logs += "N";
if (!iter->Valid()) {
break;
}
uint64_t next;
uint64_t next = 0;
GetIntVal(iter->key().ToString(), &next);
if (!check_no_key_in_range(static_cast<int64_t>(curr + 1),
static_cast<int64_t>(next))) {
@ -1334,7 +1400,7 @@ class NonBatchedOpsStressTest : public StressTest {
if (!iter->Valid()) {
break;
}
uint64_t prev;
uint64_t prev = 0;
GetIntVal(iter->key().ToString(), &prev);
if (!check_no_key_in_range(static_cast<int64_t>(prev + 1),
static_cast<int64_t>(curr))) {
@ -1343,6 +1409,7 @@ class NonBatchedOpsStressTest : public StressTest {
}
}
}
if (!iter->status().ok()) {
thread->shared->SetVerificationFailure();
fprintf(stderr, "TestIterate against expected state error: %s\n",
@ -1352,7 +1419,9 @@ class NonBatchedOpsStressTest : public StressTest {
thread->stats.AddErrors(1);
return iter->status();
}
thread->stats.AddIterations(1);
return Status::OK();
}

Loading…
Cancel
Save