Summary: The goal of this diff is to create a simple stress test with focus on catching: * bugs in compaction/flush processes, especially the ones that cause assertion errors * bugs in the code that deletes obsolete files There are two parts of the test: * write_stress, a binary that writes to the database * write_stress_runner.py, a script that invokes and kills write_stress Here are some interesting parts of write_stress: * Runs with very high concurrency of compactions and flushes (32 threads total) and tries to create a huge amount of small files * The keys written to the database are not uniformly distributed -- there is a 3-character prefix that mutates occasionally (in prefix mutator thread), in such a way that the first character mutates slower than second, which mutates slower than third character. That way, the compaction stress tests some interesting compaction features like trivial moves and bottommost level calculation * There is a thread that creates an iterator, holds it for couple of seconds and then iterates over all keys. This is supposed to test RocksDB's abilities to keep the files alive when there are references to them. * Some writes trigger WAL sync. This is stress testing our WAL sync code. * At the end of the run, we make sure that we didn't leak any of the sst files write_stress_runner.py changes the mode in which we run write_stress and also kills and restarts it. There are some interesting characteristics: * At the beginning we divide the full test runtime into smaller parts -- shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds * The first time we run write_stress, we destroy the old DB. Every next time during the test, we use the same DB. * We can run in kill mode or clean-restart mode. Kill mode kills the write_stress violently. * We can run in mode where delete_obsolete_files_with_fullscan is true or false * We can run with low_open_files mode turned on or off. When it's turned on, we configure table cache to only hold a couple of files -- that way we need to reopen files every time we access them. Another goal was to create a stress test without a lot of parameters. So tools/write_stress_runner.py should only take one parameter -- runtime_sec and it should figure out everything else on its own. In a separate diff, I'll add this new test to our nightly legocastle runs. Test Plan: The goal of this test was to retroactively catch the following bugs: D33045, D48201, D46899, D42399. I failed to reproduce D48201, but all others have been caught! When i reverted https://reviews.facebook.net/D33045: ./write_stress --runtime_sec=200 --low_open_files_mode=true Iterator statuts not OK: IO error: /fast-rocksdb-tmp/rocksdb_test/write_stress/089166.sst: No such file or directory When i reverted https://reviews.facebook.net/D42399: python tools/write_stress_runner.py --runtime_sec=5000 Running write_stress, will kill after 5 seconds: ./write_stress --runtime_sec=-1 Running write_stress, will kill after 2 seconds: ./write_stress --runtime_sec=-1 --destroy_db=false --delete_obsolete_files_with_fullscan=true Running write_stress, will kill after 7 seconds: ./write_stress --runtime_sec=-1 --destroy_db=false Running write_stress, will kill after 5 seconds: ./write_stress --runtime_sec=-1 --destroy_db=false Running write_stress, will kill after 8 seconds: ./write_stress --runtime_sec=-1 --destroy_db=false --low_open_files_mode=true Write to DB failed: IO error: /fast-rocksdb-tmp/rocksdb_test/write_stress/019250.sst: No such file or directory ERROR: write_stress died with exitcode=-6 When i reverted https://reviews.facebook.net/D46899: python tools/write_stress_runner.py --runtime_sec=1000 runtime: 1000 Going to execute write stress for [3, 3, 100, 3, 2, 100, 1, 788] Running write_stress for 3 seconds: ./write_stress --runtime_sec=3 --low_open_files_mode=true Running write_stress for 3 seconds: ./write_stress --runtime_sec=3 --destroy_db=false --delete_obsolete_files_with_fullscan=true Running write_stress, will kill after 100 seconds: ./write_stress --runtime_sec=-1 --destroy_db=false --delete_obsolete_files_with_fullscan=true write_stress: db/db_impl.cc:2070: void rocksdb::DBImpl::MarkLogsSynced(uint64_t, bool, const rocksdb::Status&): Assertion `log.getting_synced' failed. ERROR: write_stress died with exitcode=-6 Reviewers: IslamAbdelRahman, yhchiang, rven, kradhakrishnan, sdong, anthony Reviewed By: anthony Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D49533main
parent
47414c6cd6
commit
4b66d95344
@ -0,0 +1,303 @@ |
||||
// Copyright (c) 2015, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
//
|
||||
//
|
||||
// The goal of this tool is to be a simple stress test with focus on catching:
|
||||
// * bugs in compaction/flush processes, especially the ones that cause
|
||||
// assertion errors
|
||||
// * bugs in the code that deletes obsolete files
|
||||
//
|
||||
// There are two parts of the test:
|
||||
// * write_stress, a binary that writes to the database
|
||||
// * write_stress_runner.py, a script that invokes and kills write_stress
|
||||
//
|
||||
// Here are some interesting parts of write_stress:
|
||||
// * Runs with very high concurrency of compactions and flushes (32 threads
|
||||
// total) and tries to create a huge amount of small files
|
||||
// * The keys written to the database are not uniformly distributed -- there is
|
||||
// a 3-character prefix that mutates occasionally (in prefix mutator thread), in
|
||||
// such a way that the first character mutates slower than second, which mutates
|
||||
// slower than third character. That way, the compaction stress tests some
|
||||
// interesting compaction features like trivial moves and bottommost level
|
||||
// calculation
|
||||
// * There is a thread that creates an iterator, holds it for couple of seconds
|
||||
// and then iterates over all keys. This is supposed to test RocksDB's abilities
|
||||
// to keep the files alive when there are references to them.
|
||||
// * Some writes trigger WAL sync. This is stress testing our WAL sync code.
|
||||
// * At the end of the run, we make sure that we didn't leak any of the sst
|
||||
// files
|
||||
//
|
||||
// write_stress_runner.py changes the mode in which we run write_stress and also
|
||||
// kills and restarts it. There are some interesting characteristics:
|
||||
// * At the beginning we divide the full test runtime into smaller parts --
|
||||
// shorter runtimes (couple of seconds) and longer runtimes (100, 1000) seconds
|
||||
// * The first time we run write_stress, we destroy the old DB. Every next time
|
||||
// during the test, we use the same DB.
|
||||
// * We can run in kill mode or clean-restart mode. Kill mode kills the
|
||||
// write_stress violently.
|
||||
// * We can run in mode where delete_obsolete_files_with_fullscan is true or
|
||||
// false
|
||||
// * We can run with low_open_files mode turned on or off. When it's turned on,
|
||||
// we configure table cache to only hold a couple of files -- that way we need
|
||||
// to reopen files every time we access them.
|
||||
//
|
||||
// Another goal was to create a stress test without a lot of parameters. So
|
||||
// tools/write_stress_runner.py should only take one parameter -- runtime_sec
|
||||
// and it should figure out everything else on its own.
|
||||
|
||||
#include <cstdio> |
||||
|
||||
#ifndef GFLAGS |
||||
int main() { |
||||
fprintf(stderr, "Please install gflags to run rocksdb tools\n"); |
||||
return 1; |
||||
} |
||||
#else |
||||
|
||||
#include <gflags/gflags.h> |
||||
|
||||
#define __STDC_FORMAT_MACROS |
||||
#include <inttypes.h> |
||||
#include <atomic> |
||||
#include <random> |
||||
#include <set> |
||||
#include <string> |
||||
#include <thread> |
||||
|
||||
#include "rocksdb/db.h" |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/options.h" |
||||
#include "rocksdb/slice.h" |
||||
|
||||
#include "db/filename.h" |
||||
|
||||
using GFLAGS::ParseCommandLineFlags; |
||||
using GFLAGS::RegisterFlagValidator; |
||||
using GFLAGS::SetUsageMessage; |
||||
|
||||
DEFINE_int32(key_size, 10, "Key size"); |
||||
DEFINE_int32(value_size, 100, "Value size"); |
||||
DEFINE_string(db, "", "Use the db with the following name."); |
||||
DEFINE_bool(destroy_db, true, |
||||
"Destory the existing DB before running the test"); |
||||
|
||||
DEFINE_int32(runtime_sec, 10 * 60, "How long are we running for, in seconds"); |
||||
DEFINE_int32(seed, 139, "Random seed"); |
||||
|
||||
DEFINE_double(prefix_mutate_period_sec, 1.0, |
||||
"How often are we going to mutate the prefix"); |
||||
DEFINE_double(first_char_mutate_probability, 0.1, |
||||
"How likely are we to mutate the first char every period"); |
||||
DEFINE_double(second_char_mutate_probability, 0.2, |
||||
"How likely are we to mutate the second char every period"); |
||||
DEFINE_double(third_char_mutate_probability, 0.5, |
||||
"How likely are we to mutate the third char every period"); |
||||
|
||||
DEFINE_int32(iterator_hold_sec, 5, |
||||
"How long will the iterator hold files before it gets destroyed"); |
||||
|
||||
DEFINE_double(sync_probability, 0.01, "How often are we syncing writes"); |
||||
DEFINE_bool(delete_obsolete_files_with_fullscan, false, |
||||
"If true, we delete obsolete files after each compaction/flush " |
||||
"using GetChildren() API"); |
||||
DEFINE_bool(low_open_files_mode, false, |
||||
"If true, we set max_open_files to 20, so that every file access " |
||||
"needs to reopen it"); |
||||
|
||||
namespace rocksdb { |
||||
|
||||
static const int kPrefixSize = 3; |
||||
|
||||
class WriteStress { |
||||
public: |
||||
WriteStress() : stop_(false) { |
||||
// initialize key_prefix
|
||||
for (int i = 0; i < kPrefixSize; ++i) { |
||||
key_prefix_[i].store('a'); |
||||
} |
||||
|
||||
// Choose a location for the test database if none given with --db=<path>
|
||||
if (FLAGS_db.empty()) { |
||||
std::string default_db_path; |
||||
Env::Default()->GetTestDirectory(&default_db_path); |
||||
default_db_path += "/write_stress"; |
||||
FLAGS_db = default_db_path; |
||||
} |
||||
|
||||
Options options; |
||||
if (FLAGS_destroy_db) { |
||||
DestroyDB(FLAGS_db, options); // ignore
|
||||
} |
||||
|
||||
// make the LSM tree deep, so that we have many concurrent flushes and
|
||||
// compactions
|
||||
options.create_if_missing = true; |
||||
options.write_buffer_size = 256 * 1024; // 256k
|
||||
options.max_bytes_for_level_base = 1 * 1024 * 1204; // 1MB
|
||||
options.target_file_size_base = 100 * 1204; // 100k
|
||||
options.max_write_buffer_number = 16; |
||||
options.max_background_compactions = 16; |
||||
options.max_background_flushes = 16; |
||||
options.max_open_files = FLAGS_low_open_files_mode ? 20 : -1; |
||||
if (FLAGS_delete_obsolete_files_with_fullscan) { |
||||
options.delete_obsolete_files_period_micros = 0; |
||||
} |
||||
|
||||
// open DB
|
||||
DB* db; |
||||
Status s = DB::Open(options, FLAGS_db, &db); |
||||
if (!s.ok()) { |
||||
fprintf(stderr, "Can't open database: %s\n", s.ToString().c_str()); |
||||
std::abort(); |
||||
} |
||||
db_.reset(db); |
||||
} |
||||
|
||||
void WriteThread() { |
||||
std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed)); |
||||
std::uniform_real_distribution<double> dist(0, 1); |
||||
|
||||
auto random_string = [](std::mt19937& r, int len) { |
||||
std::uniform_int_distribution<char> char_dist('a', 'z'); |
||||
std::string ret; |
||||
for (int i = 0; i < len; ++i) { |
||||
ret += char_dist(r); |
||||
} |
||||
return ret; |
||||
}; |
||||
|
||||
while (!stop_.load(std::memory_order_relaxed)) { |
||||
std::string prefix; |
||||
prefix.resize(kPrefixSize); |
||||
for (int i = 0; i < kPrefixSize; ++i) { |
||||
prefix[i] = key_prefix_[i].load(std::memory_order_relaxed); |
||||
} |
||||
auto key = prefix + random_string(rng, FLAGS_key_size - kPrefixSize); |
||||
auto value = random_string(rng, FLAGS_value_size); |
||||
WriteOptions woptions; |
||||
woptions.sync = dist(rng) < FLAGS_sync_probability; |
||||
auto s = db_->Put(woptions, key, value); |
||||
if (!s.ok()) { |
||||
fprintf(stderr, "Write to DB failed: %s\n", s.ToString().c_str()); |
||||
std::abort(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void IteratorHoldThread() { |
||||
while (!stop_.load(std::memory_order_relaxed)) { |
||||
std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions())); |
||||
Env::Default()->SleepForMicroseconds(FLAGS_iterator_hold_sec * 1000 * |
||||
1000LL); |
||||
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { |
||||
} |
||||
if (!iterator->status().ok()) { |
||||
fprintf(stderr, "Iterator statuts not OK: %s\n", |
||||
iterator->status().ToString().c_str()); |
||||
std::abort(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
void PrefixMutatorThread() { |
||||
std::mt19937 rng(static_cast<unsigned int>(FLAGS_seed)); |
||||
std::uniform_real_distribution<double> dist(0, 1); |
||||
std::uniform_int_distribution<char> char_dist('a', 'z'); |
||||
while (!stop_.load(std::memory_order_relaxed)) { |
||||
Env::Default()->SleepForMicroseconds(FLAGS_prefix_mutate_period_sec * |
||||
1000 * 1000LL); |
||||
if (dist(rng) < FLAGS_first_char_mutate_probability) { |
||||
key_prefix_[0].store(char_dist(rng), std::memory_order_relaxed); |
||||
} |
||||
if (dist(rng) < FLAGS_second_char_mutate_probability) { |
||||
key_prefix_[1].store(char_dist(rng), std::memory_order_relaxed); |
||||
} |
||||
if (dist(rng) < FLAGS_third_char_mutate_probability) { |
||||
key_prefix_[2].store(char_dist(rng), std::memory_order_relaxed); |
||||
} |
||||
} |
||||
} |
||||
|
||||
int Run() { |
||||
threads_.emplace_back([&]() { WriteThread(); }); |
||||
threads_.emplace_back([&]() { PrefixMutatorThread(); }); |
||||
threads_.emplace_back([&]() { IteratorHoldThread(); }); |
||||
|
||||
if (FLAGS_runtime_sec == -1) { |
||||
// infinite runtime, until we get killed
|
||||
while (true) { |
||||
Env::Default()->SleepForMicroseconds(1000 * 1000); |
||||
} |
||||
} |
||||
|
||||
Env::Default()->SleepForMicroseconds(FLAGS_runtime_sec * 1000 * 1000); |
||||
|
||||
stop_.store(true, std::memory_order_relaxed); |
||||
for (auto& t : threads_) { |
||||
t.join(); |
||||
} |
||||
threads_.clear(); |
||||
|
||||
// let's see if we leaked some files
|
||||
db_->PauseBackgroundWork(); |
||||
std::vector<LiveFileMetaData> metadata; |
||||
db_->GetLiveFilesMetaData(&metadata); |
||||
std::set<uint64_t> sst_file_numbers; |
||||
for (const auto& file : metadata) { |
||||
uint64_t number; |
||||
FileType type; |
||||
if (!ParseFileName(file.name, &number, "LOG", &type)) { |
||||
continue; |
||||
} |
||||
if (type == kTableFile) { |
||||
sst_file_numbers.insert(number); |
||||
} |
||||
} |
||||
|
||||
std::vector<std::string> children; |
||||
Env::Default()->GetChildren(FLAGS_db, &children); |
||||
for (const auto& child : children) { |
||||
uint64_t number; |
||||
FileType type; |
||||
if (!ParseFileName(child, &number, "LOG", &type)) { |
||||
continue; |
||||
} |
||||
if (type == kTableFile) { |
||||
if (sst_file_numbers.find(number) == sst_file_numbers.end()) { |
||||
fprintf(stderr, |
||||
"Found a table file in DB path that should have been " |
||||
"deleted: %s\n", |
||||
child.c_str()); |
||||
std::abort(); |
||||
} |
||||
} |
||||
} |
||||
|
||||
db_->ContinueBackgroundWork(); |
||||
|
||||
return 0; |
||||
} |
||||
|
||||
private: |
||||
// each key is prepended with this prefix. we occasionally change it. third
|
||||
// letter is changed more frequently than second, which is changed more
|
||||
// frequently than the first one.
|
||||
std::atomic<char> key_prefix_[kPrefixSize]; |
||||
std::atomic<bool> stop_; |
||||
std::vector<std::thread> threads_; |
||||
std::unique_ptr<DB> db_; |
||||
}; |
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) { |
||||
SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + |
||||
" [OPTIONS]..."); |
||||
ParseCommandLineFlags(&argc, &argv, true); |
||||
rocksdb::WriteStress write_stress; |
||||
return write_stress.Run(); |
||||
} |
||||
|
||||
#endif // GFLAGS
|
@ -0,0 +1,73 @@ |
||||
#! /usr/bin/env python |
||||
import subprocess |
||||
import argparse |
||||
import random |
||||
import time |
||||
import sys |
||||
|
||||
|
||||
def generate_runtimes(total_runtime): |
||||
# combination of short runtimes and long runtimes, with heavier |
||||
# weight on short runtimes |
||||
possible_runtimes_sec = range(1, 10) + range(1, 20) + [100, 1000] |
||||
runtimes = [] |
||||
while total_runtime > 0: |
||||
chosen = random.choice(possible_runtimes_sec) |
||||
chosen = min(chosen, total_runtime) |
||||
runtimes.append(chosen) |
||||
total_runtime -= chosen |
||||
return runtimes |
||||
|
||||
|
||||
def main(args): |
||||
runtimes = generate_runtimes(int(args.runtime_sec)) |
||||
print "Going to execute write stress for " + str(runtimes) |
||||
first_time = True |
||||
|
||||
for runtime in runtimes: |
||||
kill = random.choice([False, True]) |
||||
|
||||
cmd = './write_stress --runtime_sec=' + \ |
||||
("-1" if kill else str(runtime)) |
||||
|
||||
if len(args.db) > 0: |
||||
cmd = cmd + ' --db=' + args.db |
||||
|
||||
if first_time: |
||||
first_time = False |
||||
else: |
||||
# use current db |
||||
cmd = cmd + ' --destroy_db=false' |
||||
if random.choice([False, True]): |
||||
cmd = cmd + ' --delete_obsolete_files_with_fullscan=true' |
||||
if random.choice([False, True]): |
||||
cmd = cmd + ' --low_open_files_mode=true' |
||||
|
||||
print("Running write_stress for %d seconds (%s): %s" % |
||||
(runtime, ("kill-mode" if kill else "clean-shutdown-mode"), |
||||
cmd)) |
||||
|
||||
child = subprocess.Popen([cmd], shell=True) |
||||
killtime = time.time() + runtime |
||||
while not kill or time.time() < killtime: |
||||
time.sleep(1) |
||||
if child.poll() is not None: |
||||
if child.returncode == 0: |
||||
break |
||||
else: |
||||
print("ERROR: write_stress died with exitcode=%d\n" |
||||
% child.returncode) |
||||
sys.exit(1) |
||||
if kill: |
||||
child.kill() |
||||
# breathe |
||||
time.sleep(3) |
||||
|
||||
if __name__ == '__main__': |
||||
random.seed(time.time()) |
||||
parser = argparse.ArgumentParser(description="This script runs and kills \ |
||||
write_stress multiple times") |
||||
parser.add_argument("--runtime_sec", default='1000') |
||||
parser.add_argument("--db", default='') |
||||
args = parser.parse_args() |
||||
main(args) |
Loading…
Reference in new issue