Add the PutEntity API to the stress/crash tests (#10760)

Summary:
The patch adds the `PutEntity` API to the non-batched, batched, and
CF consistency stress tests. Namely, when the new `db_stress` command
line parameter `use_put_entity_one_in` is greater than zero, one in
N writes on average is performed using `PutEntity` rather than `Put`.
The wide-column entity written has the generated value in its default
column; in addition, it contains up to three additional columns where
the original generated value is divided up between the column name and the
column value (with the column name containing the first k characters of
the generated value, and the column value containing the rest). Whether
`PutEntity` is used (and if so, how many columns the entity has) is completely
determined by the "value base" used to generate the value (that is, there is
no randomness involved). Assuming the same `use_put_entity_one_in` setting
is used across `db_stress` invocations, this enables us to reconstruct and
validate the entity during subsequent `db_stress` runs.

Note that `PutEntity` is currently incompatible with `Merge`, transactions, and
user-defined timestamps; these combinations are currently disabled/disallowed.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10760

Test Plan: Ran some batched, non-batched, and CF consistency stress tests using the script.

Reviewed By: riversand963

Differential Revision: D39939032

Pulled By: ltamasi

fbshipit-source-id: eafdf124e95993fb7d73158e3b006d11819f7fa9
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent fd71a82f4f
commit 9078fcccee
  1. 42
      db_stress_tool/batched_ops_stress.cc
  2. 36
      db_stress_tool/cf_consistency_stress.cc
  3. 34
      db_stress_tool/db_stress_common.cc
  4. 5
      db_stress_tool/db_stress_common.h
  5. 4
      db_stress_tool/db_stress_gflags.cc
  6. 23
      db_stress_tool/db_stress_test_base.cc
  7. 9
      db_stress_tool/db_stress_tool.cc
  8. 45
      db_stress_tool/expected_state.cc
  9. 73
      db_stress_tool/no_batched_ops_stress.cc
  10. 14
      tools/db_crashtest.py

@ -26,31 +26,41 @@ class BatchedOpsStressTest : public StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[100]) override {
uint32_t value_base =
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
const std::string key_suffix = Key(rand_keys[0]);
const uint32_t value_base =
thread->rand.Next() % thread->shared->UNKNOWN_SENTINEL;
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
std::string keys[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
std::string values[10] = {"9", "8", "7", "6", "5", "4", "3", "2", "1", "0"};
Slice value_slices[10];
const size_t sz = GenerateValue(value_base, value, sizeof(value));
const std::string value_suffix = Slice(value, sz).ToString();
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,
FLAGS_batch_protection_bytes_per_key,
FLAGS_user_timestamp_size);
Status s;
auto cfh = column_families_[rand_column_families[0]];
std::string key_str = Key(rand_keys[0]);
for (int i = 0; i < 10; i++) {
keys[i] += key_str;
values[i] += v.ToString();
value_slices[i] = values[i];
ColumnFamilyHandle* const cfh = column_families_[rand_column_families[0]];
assert(cfh);
for (int i = 9; i >= 0; --i) {
const std::string prefix = std::to_string(i);
const std::string k = prefix + key_suffix;
const std::string v = prefix + value_suffix;
if (FLAGS_use_merge) {
batch.Merge(cfh, keys[i], value_slices[i]);
batch.Merge(cfh, k, v);
} else if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else {
batch.Put(cfh, keys[i], value_slices[i]);
batch.Put(cfh, k, v);
}
}
s = db_->Write(write_opts, &batch);
const Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multiput error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);

@ -25,22 +25,36 @@ class CfConsistencyStressTest : public StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[100]) override {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
uint64_t value_base = batch_id_.fetch_add(1);
size_t sz =
GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
Slice v(value, sz);
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
const std::string k = Key(rand_keys[0]);
const uint32_t value_base = batch_id_.fetch_add(1);
const size_t sz = GenerateValue(value_base, value, sizeof(value));
const Slice v(value, sz);
WriteBatch batch;
const bool use_put_entity = !FLAGS_use_merge &&
FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
ColumnFamilyHandle* const cfh = column_families_[cf];
assert(cfh);
if (FLAGS_use_merge) {
batch.Merge(cfh, key, v);
} else { /* !FLAGS_use_merge */
batch.Put(cfh, key, v);
batch.Merge(cfh, k, v);
} else if (use_put_entity) {
batch.PutEntity(cfh, k, GenerateWideColumns(value_base, v));
} else {
batch.Put(cfh, k, v);
}
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
@ -538,7 +552,7 @@ class CfConsistencyStressTest : public StressTest {
}
private:
std::atomic<int64_t> batch_id_;
std::atomic<uint32_t> batch_id_;
};
StressTest* CreateCfConsistencyStressTest() {

@ -239,6 +239,40 @@ uint32_t GetValueBase(Slice s) {
return res;
}
WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice) {
WideColumns columns;
constexpr size_t max_columns = 4;
const size_t num_columns = (value_base % max_columns) + 1;
columns.reserve(num_columns);
assert(slice.size() >= num_columns);
columns.emplace_back(kDefaultWideColumnName, slice);
for (size_t i = 1; i < num_columns; ++i) {
const Slice name(slice.data(), i);
const Slice value(slice.data() + i, slice.size() - i);
columns.emplace_back(name, value);
}
return columns;
}
WideColumns GenerateExpectedWideColumns(uint32_t value_base,
const Slice& slice) {
WideColumns columns = GenerateWideColumns(value_base, slice);
std::sort(columns.begin(), columns.end(),
[](const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name().compare(rhs.name()) < 0;
});
return columns;
}
std::string GetNowNanos() {
uint64_t t = db_stress_env->NowNanos();
std::string ret;

@ -235,6 +235,7 @@ DECLARE_bool(in_place_update);
DECLARE_string(memtablerep);
DECLARE_int32(prefix_size);
DECLARE_bool(use_merge);
DECLARE_uint32(use_put_entity_one_in);
DECLARE_bool(use_full_merge_v1);
DECLARE_int32(sync_wal_one_in);
DECLARE_bool(avoid_unnecessary_blocking_io);
@ -620,6 +621,10 @@ extern std::vector<int64_t> GenerateNKeys(ThreadState* thread, int num_keys,
extern size_t GenerateValue(uint32_t rand, char* v, size_t max_sz);
extern uint32_t GetValueBase(Slice s);
extern WideColumns GenerateWideColumns(uint32_t value_base, const Slice& slice);
extern WideColumns GenerateExpectedWideColumns(uint32_t value_base,
const Slice& slice);
extern StressTest* CreateCfConsistencyStressTest();
extern StressTest* CreateBatchedOpsStressTest();
extern StressTest* CreateNonBatchedOpsStressTest();

@ -858,6 +858,10 @@ DEFINE_bool(use_merge, false,
"On true, replaces all writes with a Merge "
"that behaves like a Put");
DEFINE_uint32(use_put_entity_one_in, 0,
"If greater than zero, PutEntity will be used once per every N "
"write ops on average.");
DEFINE_bool(use_full_merge_v1, false,
"On true, use a merge operator that implement the deprecated "
"version of FullMerge");

@ -457,11 +457,14 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
Status s;
for (auto cfh : column_families_) {
for (int64_t k = 0; k != number_of_keys; ++k) {
std::string key_str = Key(k);
Slice key = key_str;
size_t sz = GenerateValue(0 /*value_base*/, value, sizeof(value));
Slice v(value, sz);
shared->Put(cf_idx, k, 0, true /* pending */);
const std::string key = Key(k);
constexpr uint32_t value_base = 0;
const size_t sz = GenerateValue(value_base, value, sizeof(value));
const Slice v(value, sz);
shared->Put(cf_idx, k, value_base, true /* pending */);
if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
@ -478,13 +481,13 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
}
#endif
}
} else if (FLAGS_use_put_entity_one_in > 0) {
s = db_->PutEntity(write_opts, cfh, key,
GenerateWideColumns(value_base, v));
} else {
if (!FLAGS_use_txn) {
std::string ts_str;
Slice ts;
if (FLAGS_user_timestamp_size > 0) {
ts_str = GetNowNanos();
ts = ts_str;
const std::string ts = GetNowNanos();
s = db_->Put(write_opts, cfh, key, ts, v);
} else {
s = db_->Put(write_opts, cfh, key, v);
@ -503,7 +506,7 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
}
}
shared->Put(cf_idx, k, 0, false /* pending */);
shared->Put(cf_idx, k, value_base, false /* pending */);
if (!s.ok()) {
break;
}

@ -294,6 +294,15 @@ int db_stress_tool(int argc, char** argv) {
exit(1);
}
if (FLAGS_use_put_entity_one_in > 0 &&
(FLAGS_use_merge || FLAGS_use_full_merge_v1 || FLAGS_use_txn ||
FLAGS_test_multi_ops_txns || FLAGS_user_timestamp_size > 0)) {
fprintf(stderr,
"PutEntity is currently incompatible with Merge, transactions, and "
"user-defined timestamps\n");
exit(1);
}
#ifndef NDEBUG
KillPoint* kp = KillPoint::GetInstance();
kp->rocksdb_kill_odds = FLAGS_kill_random_test;

@ -7,6 +7,7 @@
#include "db_stress_tool/expected_state.h"
#include "db/wide/wide_column_serialization.h"
#include "db_stress_tool/db_stress_common.h"
#include "db_stress_tool/db_stress_shared_state.h"
#include "rocksdb/trace_reader_writer.h"
@ -405,6 +406,50 @@ class ExpectedStateTraceRecordHandler : public TraceRecord::Handler,
return Status::OK();
}
Status PutEntityCF(uint32_t column_family_id, const Slice& key_with_ts,
const Slice& entity) override {
Slice key =
StripTimestampFromUserKey(key_with_ts, FLAGS_user_timestamp_size);
uint64_t key_id = 0;
if (!GetIntVal(key.ToString(), &key_id)) {
return Status::Corruption("Unable to parse key", key.ToString());
}
Slice entity_copy = entity;
WideColumns columns;
if (!WideColumnSerialization::Deserialize(entity_copy, columns).ok()) {
return Status::Corruption("Unable to deserialize entity",
entity.ToString(/* hex */ true));
}
if (columns.empty() || columns[0].name() != kDefaultWideColumnName) {
return Status::Corruption("Cannot find default column in entity",
entity.ToString(/* hex */ true));
}
const Slice& value_of_default = columns[0].value();
const uint32_t value_base = GetValueBase(value_of_default);
if (columns != GenerateExpectedWideColumns(value_base, value_of_default)) {
return Status::Corruption("Wide columns in entity inconsistent",
entity.ToString(/* hex */ true));
}
if (buffered_writes_) {
return WriteBatchInternal::PutEntity(buffered_writes_.get(),
column_family_id, key, columns);
}
state_->Put(column_family_id, static_cast<int64_t>(key_id), value_base,
false /* pending */);
++num_write_ops_;
return Status::OK();
}
Status DeleteCF(uint32_t column_family_id,
const Slice& key_with_ts) override {
Slice key =

@ -57,10 +57,14 @@ class NonBatchedOpsStressTest : public StressTest {
kNumberOfMethods
};
const int num_methods =
constexpr int num_methods =
static_cast<int>(VerificationMethod::kNumberOfMethods);
// Note: Merge/GetMergeOperands is currently not supported for wide-column
// entities
const VerificationMethod method =
static_cast<VerificationMethod>(thread->rand.Uniform(num_methods));
static_cast<VerificationMethod>(thread->rand.Uniform(
FLAGS_use_put_entity_one_in > 0 ? num_methods - 1 : num_methods));
if (method == VerificationMethod::kIterator) {
std::unique_ptr<Iterator> iter(
@ -88,13 +92,11 @@ class NonBatchedOpsStressTest : public StressTest {
}
Status s = iter->status();
Slice iter_key;
std::string from_db;
if (iter->Valid()) {
iter_key = iter->key();
const int diff = iter_key.compare(k);
const int diff = iter->key().compare(k);
if (diff > 0) {
s = Status::NotFound();
@ -708,38 +710,44 @@ class NonBatchedOpsStressTest : public StressTest {
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[100]) override {
assert(!rand_column_families.empty());
assert(!rand_keys.empty());
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
assert(shared);
const int64_t max_key = shared->GetMaxKey();
int64_t rand_key = rand_keys[0];
int rand_column_family = rand_column_families[0];
std::string write_ts_str;
Slice write_ts;
std::string write_ts;
std::unique_ptr<MutexLock> lock(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
while (!shared->AllowsOverwrite(rand_key) &&
(FLAGS_use_merge || shared->Exists(rand_column_family, rand_key))) {
lock.reset();
rand_key = thread->rand.Next() % max_key;
rand_column_family = thread->rand.Next() % FLAGS_column_families;
lock.reset(
new MutexLock(shared->GetMutexForKey(rand_column_family, rand_key)));
if (FLAGS_user_timestamp_size > 0) {
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
write_ts = GetNowNanos();
}
}
if (write_ts.size() == 0 && FLAGS_user_timestamp_size) {
write_ts_str = GetNowNanos();
write_ts = write_ts_str;
if (write_ts.empty() && FLAGS_user_timestamp_size) {
write_ts = GetNowNanos();
}
std::string key_str = Key(rand_key);
Slice key = key_str;
ColumnFamilyHandle* cfh = column_families_[rand_column_family];
const std::string k = Key(rand_key);
ColumnFamilyHandle* const cfh = column_families_[rand_column_family];
assert(cfh);
if (FLAGS_verify_before_write) {
std::string key_str2 = Key(rand_key);
Slice k = key_str2;
std::string from_db;
Status s = db_->Get(read_opts, cfh, k, &from_db);
if (!VerifyOrSyncValue(rand_column_family, rand_key, read_opts, shared,
@ -747,39 +755,47 @@ class NonBatchedOpsStressTest : public StressTest {
return s;
}
}
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
const uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
const size_t sz = GenerateValue(value_base, value, sizeof(value));
const Slice v(value, sz);
shared->Put(rand_column_family, rand_key, value_base, true /* pending */);
Status s;
if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
s = db_->Merge(write_opts, cfh, key, v);
s = db_->Merge(write_opts, cfh, k, v);
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Merge(cfh, key, v);
s = txn->Merge(cfh, k, v);
if (s.ok()) {
s = CommitTxn(txn, thread);
}
}
#endif
}
} else if (FLAGS_use_put_entity_one_in > 0 &&
(value_base % FLAGS_use_put_entity_one_in) == 0) {
s = db_->PutEntity(write_opts, cfh, k,
GenerateWideColumns(value_base, v));
} else {
if (!FLAGS_use_txn) {
if (FLAGS_user_timestamp_size == 0) {
s = db_->Put(write_opts, cfh, key, v);
s = db_->Put(write_opts, cfh, k, v);
} else {
s = db_->Put(write_opts, cfh, key, write_ts, v);
s = db_->Put(write_opts, cfh, k, write_ts, v);
}
} else {
#ifndef ROCKSDB_LITE
Transaction* txn;
s = NewTxn(write_opts, &txn);
if (s.ok()) {
s = txn->Put(cfh, key, v);
s = txn->Put(cfh, k, v);
if (s.ok()) {
s = CommitTxn(txn, thread);
}
@ -787,7 +803,9 @@ class NonBatchedOpsStressTest : public StressTest {
#endif
}
}
shared->Put(rand_column_family, rand_key, value_base, false /* pending */);
if (!s.ok()) {
if (FLAGS_injest_error_severity >= 2) {
if (!is_db_stopped_ && s.severity() >= Status::Severity::kFatalError) {
@ -802,6 +820,7 @@ class NonBatchedOpsStressTest : public StressTest {
std::terminate();
}
}
thread->stats.AddBytesForWrites(1, sz);
PrintKeyValue(rand_column_family, static_cast<uint32_t>(rand_key), value,
sz);

@ -124,6 +124,8 @@ default_params = {
# fast_lru_cache is incompatible with stress tests, because it doesn't support strict_capacity_limit == false.
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
# use_put_entity_one_in has to be the same across invocations for verification to work, hence no lambda
"use_put_entity_one_in": random.choice([0] * 7 + [1, 5, 10]),
# 999 -> use Bloom API
"ribbon_starting_level": lambda: random.choice([random.randint(-1, 10), 999]),
"value_size_mult": 32,
@ -348,6 +350,8 @@ txn_params = {
# pipeline write is not currnetly compatible with WritePrepared txns
"enable_pipelined_write": 0,
"create_timestamped_snapshot_one_in": random.choice([0, 20]),
# PutEntity in transactions is not yet implemented
"use_put_entity_one_in" : 0,
}
best_efforts_recovery_params = {
@ -392,6 +396,8 @@ ts_params = {
"enable_blob_files": 0,
"use_blob_db": 0,
"ingest_external_file_one_in": 0,
# PutEntity with timestamps is not yet implemented
"use_put_entity_one_in" : 0,
}
tiered_params = {
@ -446,6 +452,8 @@ multiops_txn_default_params = {
"enable_compaction_filter": 0,
"create_timestamped_snapshot_one_in": 50,
"sync_fault_injection": 0,
# PutEntity in transactions is not yet implemented
"use_put_entity_one_in" : 0,
}
multiops_wc_txn_params = {
@ -595,6 +603,12 @@ def finalize_and_sanitize(src_params):
# compatible with only write committed policy
if (dest_params.get("use_txn") == 1 and dest_params.get("txn_write_policy") != 0):
dest_params["sync_fault_injection"] = 0
# PutEntity is currently not supported with Merge
if dest_params["use_put_entity_one_in"] != 0:
dest_params["use_merge"] = 0
dest_params["use_full_merge_v1"] = 0
return dest_params

Loading…
Cancel
Save