fix db_stress test

Summary: Fix the db_stress test, let is run with HashSkipList for real

Test Plan:
python tools/db_crashtest.py
python tools/db_crashtest2.py

Reviewers: igor, haobo

Reviewed By: igor

CC: leveldb

Differential Revision: https://reviews.facebook.net/D16773
main
Lei Jin 11 years ago
parent 6c66bc08d9
commit 02dab3be11
  1. 25
      tools/db_crashtest.py
  2. 2
      tools/db_crashtest2.py
  3. 44
      tools/db_stress.cc

@ -93,6 +93,8 @@ def main(argv):
--max_background_compactions=20 --max_background_compactions=20
--max_bytes_for_level_base=10485760 --max_bytes_for_level_base=10485760
--filter_deletes=%s --filter_deletes=%s
--memtablerep=prefix_hash
--prefix_size=2
""" % (ops_per_thread, """ % (ops_per_thread,
threads, threads,
write_buf_size, write_buf_size,
@ -108,16 +110,23 @@ def main(argv):
print("Running db_stress with pid=%d: %s\n\n" print("Running db_stress with pid=%d: %s\n\n"
% (child.pid, cmd)) % (child.pid, cmd))
stop_early = False
while time.time() < killtime: while time.time() < killtime:
time.sleep(10) if child.poll() is not None:
print("WARNING: db_stress ended before kill: exitcode=%d\n"
% child.returncode)
stop_early = True
break
time.sleep(1)
if child.poll() is not None: if not stop_early:
print("WARNING: db_stress ended before kill: exitcode=%d\n" if child.poll() is not None:
% child.returncode) print("WARNING: db_stress ended before kill: exitcode=%d\n"
else: % child.returncode)
child.kill() else:
print("KILLED %d\n" % child.pid) child.kill()
time.sleep(1) # time to stabilize after a kill print("KILLED %d\n" % child.pid)
time.sleep(1) # time to stabilize after a kill
while True: while True:
line = child.stderr.readline().strip() line = child.stderr.readline().strip()

@ -107,6 +107,8 @@ def main(argv):
--max_background_compactions=20 --max_background_compactions=20
--max_bytes_for_level_base=10485760 --max_bytes_for_level_base=10485760
--filter_deletes=%s --filter_deletes=%s
--memtablerep=prefix_hash
--prefix_size=2
%s %s
""" % (random.randint(0, 1), """ % (random.randint(0, 1),
threads, threads,

@ -328,17 +328,17 @@ enum RepFactory StringToRepFactory(const char* ctype) {
return kSkipList; return kSkipList;
} }
static enum RepFactory FLAGS_rep_factory; static enum RepFactory FLAGS_rep_factory;
DEFINE_string(memtablerep, "skip_list", ""); DEFINE_string(memtablerep, "prefix_hash", "");
static bool ValidatePrefixSize(const char* flagname, int32_t value) { static bool ValidatePrefixSize(const char* flagname, int32_t value) {
if (value < 0 || value>=2000000000) { if (value < 0 || value > 8) {
fprintf(stderr, "Invalid value for --%s: %d. 0<= PrefixSize <=2000000000\n", fprintf(stderr, "Invalid value for --%s: %d. 0 <= PrefixSize <= 8\n",
flagname, value); flagname, value);
return false; return false;
} }
return true; return true;
} }
DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep"); DEFINE_int32(prefix_size, 2, "Control the prefix size for HashSkipListRep");
static const bool FLAGS_prefix_size_dummy = static const bool FLAGS_prefix_size_dummy =
google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize);
@ -931,15 +931,15 @@ class StressTest {
return s; return s;
} }
// Given a prefix P, this does prefix scans for "0"+P, "1"+P,..."9"+P // Given a key, this does prefix scans for "0"+P, "1"+P,..."9"+P
// in the same snapshot. Each of these 10 scans returns a series of // in the same snapshot where P is the first FLAGS_prefix_size - 1 bytes
// values; each series should be the same length, and it is verified // of the key. Each of these 10 scans returns a series of values;
// for each index i that all the i'th values are of the form "0"+V, // each series should be the same length, and it is verified for each
// "1"+V,..."9"+V. // index i that all the i'th values are of the form "0"+V, "1"+V,..."9"+V.
// ASSUMES that MultiPut was used to put (K, V) // ASSUMES that MultiPut was used to put (K, V)
Status MultiPrefixScan(ThreadState* thread, Status MultiPrefixScan(ThreadState* thread,
const ReadOptions& readoptions, const ReadOptions& readoptions,
const Slice& prefix) { const Slice& key) {
std::string prefixes[10] = {"0", "1", "2", "3", "4", std::string prefixes[10] = {"0", "1", "2", "3", "4",
"5", "6", "7", "8", "9"}; "5", "6", "7", "8", "9"};
Slice prefix_slices[10]; Slice prefix_slices[10];
@ -948,8 +948,9 @@ class StressTest {
Iterator* iters[10]; Iterator* iters[10];
Status s = Status::OK(); Status s = Status::OK();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
prefixes[i] += prefix.ToString(); prefixes[i] += key.ToString();
prefix_slices[i] = prefixes[i]; prefixes[i].resize(FLAGS_prefix_size);
prefix_slices[i] = Slice(prefixes[i]);
readoptionscopy[i] = readoptions; readoptionscopy[i] = readoptions;
readoptionscopy[i].prefix = &prefix_slices[i]; readoptionscopy[i].prefix = &prefix_slices[i];
readoptionscopy[i].snapshot = snapshot; readoptionscopy[i].snapshot = snapshot;
@ -980,7 +981,7 @@ class StressTest {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
if (values[i] != values[0]) { if (values[i] != values[0]) {
fprintf(stderr, "error : inconsistent values for prefix %s: %s, %s\n", fprintf(stderr, "error : inconsistent values for prefix %s: %s, %s\n",
prefix.ToString().c_str(), values[0].c_str(), prefixes[i].c_str(), values[0].c_str(),
values[i].c_str()); values[i].c_str());
// we continue after error rather than exiting so that we can // we continue after error rather than exiting so that we can
// find more errors if any // find more errors if any
@ -1016,6 +1017,7 @@ class StressTest {
const Snapshot* snapshot = db_->GetSnapshot(); const Snapshot* snapshot = db_->GetSnapshot();
ReadOptions readoptionscopy = readoptions; ReadOptions readoptionscopy = readoptions;
readoptionscopy.snapshot = snapshot; readoptionscopy.snapshot = snapshot;
readoptionscopy.prefix_seek = FLAGS_prefix_size > 0;
unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy)); unique_ptr<Iterator> iter(db_->NewIterator(readoptionscopy));
iter->Seek(key); iter->Seek(key);
@ -1098,8 +1100,8 @@ class StressTest {
// keys are longs (e.g., 8 bytes), so we let prefixes be // keys are longs (e.g., 8 bytes), so we let prefixes be
// everything except the last byte. So there will be 2^8=256 // everything except the last byte. So there will be 2^8=256
// keys per prefix. // keys per prefix.
Slice prefix = Slice(key.data(), key.size() - 1);
if (!FLAGS_test_batches_snapshots) { if (!FLAGS_test_batches_snapshots) {
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
read_opts.prefix = &prefix; read_opts.prefix = &prefix;
Iterator* iter = db_->NewIterator(read_opts); Iterator* iter = db_->NewIterator(read_opts);
int count = 0; int count = 0;
@ -1115,7 +1117,7 @@ class StressTest {
} }
delete iter; delete iter;
} else { } else {
MultiPrefixScan(thread, read_opts, prefix); MultiPrefixScan(thread, read_opts, key);
} }
read_opts.prefix = nullptr; read_opts.prefix = nullptr;
} else if (prefixBound <= prob_op && prob_op < writeBound) { } else if (prefixBound <= prob_op && prob_op < writeBound) {
@ -1509,6 +1511,18 @@ int main(int argc, char** argv) {
// max number of concurrent compactions. // max number of concurrent compactions.
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions); FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
if (FLAGS_prefixpercent > 0 && FLAGS_prefix_size <= 0) {
fprintf(stderr,
"Error: prefixpercent is non-zero while prefix_size is "
"not positive!\n");
exit(1);
}
if (FLAGS_test_batches_snapshots && FLAGS_prefix_size <= 0) {
fprintf(stderr,
"Error: please specify prefix_size for "
"test_batches_snapshots test!\n");
exit(1);
}
if ((FLAGS_readpercent + FLAGS_prefixpercent + if ((FLAGS_readpercent + FLAGS_prefixpercent +
FLAGS_writepercent + FLAGS_delpercent + FLAGS_iterpercent) != 100) { FLAGS_writepercent + FLAGS_delpercent + FLAGS_iterpercent) != 100) {
fprintf(stderr, fprintf(stderr,

Loading…
Cancel
Save