|
|
|
@ -330,6 +330,7 @@ class CfConsistencyStressTest : public StressTest { |
|
|
|
|
// This `ReadOptions` is for validation purposes. Ignore
|
|
|
|
|
// `FLAGS_rate_limit_user_ops` to avoid slowing any validation.
|
|
|
|
|
ReadOptions options(FLAGS_verify_checksum, true); |
|
|
|
|
|
|
|
|
|
// We must set total_order_seek to true because we are doing a SeekToFirst
|
|
|
|
|
// on a column family whose memtables may support (by default) prefix-based
|
|
|
|
|
// iterator. In this case, NewIterator with options.total_order_seek being
|
|
|
|
@ -338,54 +339,73 @@ class CfConsistencyStressTest : public StressTest { |
|
|
|
|
// iterate the memtable using this iterator any more, although the memtable
|
|
|
|
|
// contains the most up-to-date key-values.
|
|
|
|
|
options.total_order_seek = true; |
|
|
|
|
const auto ss_deleter = [this](const Snapshot* ss) { |
|
|
|
|
db_->ReleaseSnapshot(ss); |
|
|
|
|
}; |
|
|
|
|
std::unique_ptr<const Snapshot, decltype(ss_deleter)> snapshot_guard( |
|
|
|
|
db_->GetSnapshot(), ss_deleter); |
|
|
|
|
options.snapshot = snapshot_guard.get(); |
|
|
|
|
assert(thread != nullptr); |
|
|
|
|
auto shared = thread->shared; |
|
|
|
|
std::vector<std::unique_ptr<Iterator>> iters(column_families_.size()); |
|
|
|
|
for (size_t i = 0; i != column_families_.size(); ++i) { |
|
|
|
|
iters[i].reset(db_->NewIterator(options, column_families_[i])); |
|
|
|
|
} |
|
|
|
|
for (auto& iter : iters) { |
|
|
|
|
iter->SeekToFirst(); |
|
|
|
|
|
|
|
|
|
ManagedSnapshot snapshot_guard(db_); |
|
|
|
|
options.snapshot = snapshot_guard.snapshot(); |
|
|
|
|
|
|
|
|
|
const size_t num = column_families_.size(); |
|
|
|
|
|
|
|
|
|
std::vector<std::unique_ptr<Iterator>> iters; |
|
|
|
|
iters.reserve(num); |
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num; ++i) { |
|
|
|
|
iters.emplace_back(db_->NewIterator(options, column_families_[i])); |
|
|
|
|
iters.back()->SeekToFirst(); |
|
|
|
|
} |
|
|
|
|
size_t num = column_families_.size(); |
|
|
|
|
assert(num == iters.size()); |
|
|
|
|
|
|
|
|
|
std::vector<Status> statuses(num, Status::OK()); |
|
|
|
|
|
|
|
|
|
assert(thread); |
|
|
|
|
|
|
|
|
|
auto shared = thread->shared; |
|
|
|
|
assert(shared); |
|
|
|
|
|
|
|
|
|
do { |
|
|
|
|
if (shared->HasVerificationFailedYet()) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
size_t valid_cnt = 0; |
|
|
|
|
size_t idx = 0; |
|
|
|
|
for (auto& iter : iters) { |
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num; ++i) { |
|
|
|
|
const auto& iter = iters[i]; |
|
|
|
|
assert(iter); |
|
|
|
|
|
|
|
|
|
if (iter->Valid()) { |
|
|
|
|
++valid_cnt; |
|
|
|
|
const WideColumns expected_columns = GenerateExpectedWideColumns( |
|
|
|
|
GetValueBase(iter->value()), iter->value()); |
|
|
|
|
if (iter->columns() != expected_columns) { |
|
|
|
|
statuses[i] = Status::Corruption( |
|
|
|
|
"Value and columns inconsistent", |
|
|
|
|
DebugString(iter->value(), iter->columns(), expected_columns)); |
|
|
|
|
} else { |
|
|
|
|
++valid_cnt; |
|
|
|
|
} |
|
|
|
|
} else { |
|
|
|
|
statuses[idx] = iter->status(); |
|
|
|
|
statuses[i] = iter->status(); |
|
|
|
|
} |
|
|
|
|
++idx; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (valid_cnt == 0) { |
|
|
|
|
Status status; |
|
|
|
|
for (size_t i = 0; i != num; ++i) { |
|
|
|
|
for (size_t i = 0; i < num; ++i) { |
|
|
|
|
const auto& s = statuses[i]; |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
status = s; |
|
|
|
|
fprintf(stderr, "Iterator on cf %s has error: %s\n", |
|
|
|
|
column_families_[i]->GetName().c_str(), |
|
|
|
|
s.ToString().c_str()); |
|
|
|
|
shared->SetVerificationFailure(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
break; |
|
|
|
|
} else if (valid_cnt != iters.size()) { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (valid_cnt < num) { |
|
|
|
|
shared->SetVerificationFailure(); |
|
|
|
|
for (size_t i = 0; i != num; ++i) { |
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num; ++i) { |
|
|
|
|
assert(iters[i]); |
|
|
|
|
|
|
|
|
|
if (!iters[i]->Valid()) { |
|
|
|
|
if (statuses[i].ok()) { |
|
|
|
|
fprintf(stderr, "Finished scanning cf %s\n", |
|
|
|
@ -400,83 +420,105 @@ class CfConsistencyStressTest : public StressTest { |
|
|
|
|
column_families_[i]->GetName().c_str()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (shared->HasVerificationFailedYet()) { |
|
|
|
|
break; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// If the program reaches here, then all column families' iterators are
|
|
|
|
|
// still valid.
|
|
|
|
|
assert(valid_cnt == num); |
|
|
|
|
|
|
|
|
|
if (shared->PrintingVerificationResults()) { |
|
|
|
|
continue; |
|
|
|
|
} |
|
|
|
|
Slice key; |
|
|
|
|
Slice value; |
|
|
|
|
|
|
|
|
|
assert(iters[0]); |
|
|
|
|
|
|
|
|
|
const Slice key = iters[0]->key(); |
|
|
|
|
const Slice value = iters[0]->value(); |
|
|
|
|
|
|
|
|
|
int num_mismatched_cfs = 0; |
|
|
|
|
for (size_t i = 0; i != num; ++i) { |
|
|
|
|
if (i == 0) { |
|
|
|
|
key = iters[i]->key(); |
|
|
|
|
value = iters[i]->value(); |
|
|
|
|
} else { |
|
|
|
|
int cmp = key.compare(iters[i]->key()); |
|
|
|
|
if (cmp != 0) { |
|
|
|
|
++num_mismatched_cfs; |
|
|
|
|
if (1 == num_mismatched_cfs) { |
|
|
|
|
fprintf(stderr, "Verification failed\n"); |
|
|
|
|
fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n", |
|
|
|
|
db_->GetLatestSequenceNumber()); |
|
|
|
|
fprintf(stderr, "[%s] %s => %s\n", |
|
|
|
|
column_families_[0]->GetName().c_str(), |
|
|
|
|
key.ToString(true /* hex */).c_str(), |
|
|
|
|
value.ToString(true /* hex */).c_str()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for (size_t i = 1; i < num; ++i) { |
|
|
|
|
assert(iters[i]); |
|
|
|
|
|
|
|
|
|
const int cmp = key.compare(iters[i]->key()); |
|
|
|
|
|
|
|
|
|
if (cmp != 0) { |
|
|
|
|
++num_mismatched_cfs; |
|
|
|
|
|
|
|
|
|
if (1 == num_mismatched_cfs) { |
|
|
|
|
fprintf(stderr, "Verification failed\n"); |
|
|
|
|
fprintf(stderr, "Latest Sequence Number: %" PRIu64 "\n", |
|
|
|
|
db_->GetLatestSequenceNumber()); |
|
|
|
|
fprintf(stderr, "[%s] %s => %s\n", |
|
|
|
|
column_families_[i]->GetName().c_str(), |
|
|
|
|
iters[i]->key().ToString(true /* hex */).c_str(), |
|
|
|
|
iters[i]->value().ToString(true /* hex */).c_str()); |
|
|
|
|
column_families_[0]->GetName().c_str(), |
|
|
|
|
key.ToString(true /* hex */).c_str(), |
|
|
|
|
value.ToString(true /* hex */).c_str()); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fprintf(stderr, "[%s] %s => %s\n", |
|
|
|
|
column_families_[i]->GetName().c_str(), |
|
|
|
|
iters[i]->key().ToString(true /* hex */).c_str(), |
|
|
|
|
iters[i]->value().ToString(true /* hex */).c_str()); |
|
|
|
|
|
|
|
|
|
#ifndef ROCKSDB_LITE |
|
|
|
|
Slice begin_key; |
|
|
|
|
Slice end_key; |
|
|
|
|
if (cmp < 0) { |
|
|
|
|
begin_key = key; |
|
|
|
|
end_key = iters[i]->key(); |
|
|
|
|
} else { |
|
|
|
|
begin_key = iters[i]->key(); |
|
|
|
|
end_key = key; |
|
|
|
|
} |
|
|
|
|
Slice begin_key; |
|
|
|
|
Slice end_key; |
|
|
|
|
if (cmp < 0) { |
|
|
|
|
begin_key = key; |
|
|
|
|
end_key = iters[i]->key(); |
|
|
|
|
} else { |
|
|
|
|
begin_key = iters[i]->key(); |
|
|
|
|
end_key = key; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const auto print_key_versions = [&](ColumnFamilyHandle* cfh) { |
|
|
|
|
constexpr size_t kMaxNumIKeys = 8; |
|
|
|
|
|
|
|
|
|
std::vector<KeyVersion> versions; |
|
|
|
|
const size_t kMaxNumIKeys = 8; |
|
|
|
|
const auto print_key_versions = [&](ColumnFamilyHandle* cfh) { |
|
|
|
|
Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key, |
|
|
|
|
kMaxNumIKeys, &versions); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
fprintf(stderr, "%s\n", s.ToString().c_str()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
assert(nullptr != cfh); |
|
|
|
|
fprintf(stderr, |
|
|
|
|
"Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt |
|
|
|
|
")\n", |
|
|
|
|
cfh->GetName().c_str(), |
|
|
|
|
begin_key.ToString(true /* hex */).c_str(), |
|
|
|
|
end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys); |
|
|
|
|
for (const KeyVersion& kv : versions) { |
|
|
|
|
fprintf(stderr, " key %s seq %" PRIu64 " type %d\n", |
|
|
|
|
Slice(kv.user_key).ToString(true).c_str(), kv.sequence, |
|
|
|
|
kv.type); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
if (1 == num_mismatched_cfs) { |
|
|
|
|
print_key_versions(column_families_[0]); |
|
|
|
|
const Status s = GetAllKeyVersions(db_, cfh, begin_key, end_key, |
|
|
|
|
kMaxNumIKeys, &versions); |
|
|
|
|
if (!s.ok()) { |
|
|
|
|
fprintf(stderr, "%s\n", s.ToString().c_str()); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
print_key_versions(column_families_[i]); |
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
shared->SetVerificationFailure(); |
|
|
|
|
|
|
|
|
|
assert(cfh); |
|
|
|
|
|
|
|
|
|
fprintf(stderr, |
|
|
|
|
"Internal keys in CF '%s', [%s, %s] (max %" ROCKSDB_PRIszt |
|
|
|
|
")\n", |
|
|
|
|
cfh->GetName().c_str(), |
|
|
|
|
begin_key.ToString(true /* hex */).c_str(), |
|
|
|
|
end_key.ToString(true /* hex */).c_str(), kMaxNumIKeys); |
|
|
|
|
|
|
|
|
|
for (const KeyVersion& kv : versions) { |
|
|
|
|
fprintf(stderr, " key %s seq %" PRIu64 " type %d\n", |
|
|
|
|
Slice(kv.user_key).ToString(true).c_str(), kv.sequence, |
|
|
|
|
kv.type); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
if (1 == num_mismatched_cfs) { |
|
|
|
|
print_key_versions(column_families_[0]); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
print_key_versions(column_families_[i]); |
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
|
|
|
|
|
|
shared->SetVerificationFailure(); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
shared->FinishPrintingVerificationResults(); |
|
|
|
|
|
|
|
|
|
for (auto& iter : iters) { |
|
|
|
|
assert(iter); |
|
|
|
|
iter->Next(); |
|
|
|
|
} |
|
|
|
|
} while (true); |
|
|
|
|