Add crash-recovery correctness check to db_stress

Summary:
Previously, our `db_stress` tool held the expected state of the DB in-memory, so after crash-recovery, there was no way to verify data correctness. This PR adds an option, `--expected_values_file`, which specifies a file holding the expected values.

In black-box testing, the `db_stress` process can be killed arbitrarily, so updates to the `--expected_values_file` must be atomic. We achieve this by `mmap`ing the file and relying on `std::atomic<uint32_t>` for atomicity. Actually this doesn't provide a total guarantee on what we want as `std::atomic<uint32_t>` could, in theory, be translated into multiple stores surrounded by a mutex. We can verify our assumption by looking at `std::atomic::is_always_lock_free`.

For the `mmap`'d file, we didn't have an existing way to expose its contents as a raw memory buffer. This PR adds it in the `Env::NewMemoryMappedFileBuffer` function, and `MemoryMappedFileBuffer` class.

`db_crashtest.py` is updated to use an expected values file for black-box testing. On the first iteration (when the DB is created), an empty file is provided as `db_stress` will populate it when it runs. On subsequent iterations, that same filename is provided so `db_stress` can check the data is as expected on startup.
Closes https://github.com/facebook/rocksdb/pull/3629

Differential Revision: D7463144

Pulled By: ajkr

fbshipit-source-id: c8f3e82c93e045a90055e2468316be155633bd8b
main
Andrew Kryczka 6 years ago committed by Facebook Github Bot
parent bc0da4b512
commit a4fb1f8c04
  1. 2
      env/env.cc
  2. 41
      env/env_posix.cc
  3. 35
      env/env_test.cc
  4. 5
      env/io_posix.cc
  5. 6
      env/io_posix.h
  6. 22
      include/rocksdb/env.h
  7. 18
      tools/db_crashtest.py
  8. 188
      tools/db_stress.cc

2
env/env.cc vendored

@ -87,6 +87,8 @@ RandomAccessFile::~RandomAccessFile() {
WritableFile::~WritableFile() {
}
MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
Logger::~Logger() {}
Status Logger::Close() {

41
env/env_posix.cc vendored

@ -457,6 +457,47 @@ class PosixEnv : public Env {
return Status::OK();
}
virtual Status NewMemoryMappedFileBuffer(
const std::string& fname,
unique_ptr<MemoryMappedFileBuffer>* result) override {
int fd = -1;
Status status;
while (fd < 0) {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_RDWR, 0644);
if (fd < 0) {
// Error while opening the file
if (errno == EINTR) {
continue;
}
status =
IOError("While open file for raw mmap buffer access", fname, errno);
break;
}
}
uint64_t size;
if (status.ok()) {
status = GetFileSize(fname, &size);
}
void* base;
if (status.ok()) {
base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
status = IOError("while mmap file for read", fname, errno);
}
}
if (status.ok()) {
result->reset(
new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
}
if (fd >= 0) {
// don't need to keep it open after mmap has been called
close(fd);
}
return status;
}
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) override {
result->reset();

35
env/env_test.cc vendored

@ -200,6 +200,41 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) {
}
#endif
TEST_F(EnvPosixTest, MemoryMappedFileBuffer) {
const int kFileBytes = 1 << 15; // 32 KB
std::string expected_data;
std::string fname = test::TmpDir(env_) + "/" + "testfile";
{
unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
Random rnd(301);
test::RandomString(&rnd, kFileBytes, &expected_data);
ASSERT_OK(wfile->Append(expected_data));
}
std::unique_ptr<MemoryMappedFileBuffer> mmap_buffer;
Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer);
// it should be supported at least on linux
#if !defined(OS_LINUX)
if (status.IsNotSupported()) {
fprintf(stderr,
"skipping EnvPosixTest.MemoryMappedFileBuffer due to "
"unsupported Env::NewMemoryMappedFileBuffer\n");
return;
}
#endif // !defined(OS_LINUX)
ASSERT_OK(status);
ASSERT_NE(nullptr, mmap_buffer.get());
ASSERT_NE(nullptr, mmap_buffer->base);
ASSERT_EQ(kFileBytes, mmap_buffer->length);
std::string actual_data(static_cast<char*>(mmap_buffer->base),
mmap_buffer->length);
ASSERT_EQ(expected_data, actual_data);
}
TEST_P(EnvPosixTestWithParam, UnSchedule) {
std::atomic<bool> called(false);
env_->SetBackgroundThreads(1, Env::LOW);

5
env/io_posix.cc vendored

@ -1052,6 +1052,11 @@ Status PosixRandomRWFile::Close() {
return Status::OK();
}
PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
// TODO should have error handling though not much we can do...
munmap(this->base, length);
}
/*
* PosixDirectory
*/

6
env/io_posix.h vendored

@ -236,6 +236,12 @@ class PosixRandomRWFile : public RandomRWFile {
int fd_;
};
struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
PosixMemoryMappedFileBuffer(void* _base, size_t _length)
: MemoryMappedFileBuffer(_base, _length) {}
virtual ~PosixMemoryMappedFileBuffer();
};
class PosixDirectory : public Directory {
public:
explicit PosixDirectory(int fd) : fd_(fd) {}

@ -42,6 +42,7 @@ class SequentialFile;
class Slice;
class WritableFile;
class RandomRWFile;
struct MemoryMappedFileBuffer;
class Directory;
struct DBOptions;
struct ImmutableDBOptions;
@ -204,6 +205,16 @@ class Env {
return Status::NotSupported("RandomRWFile is not implemented in this Env");
}
// Opens `fname` as a memory-mapped file for read and write (in-place updates
// only, i.e., no appends). On success, stores a raw buffer covering the whole
// file in `*result`. The file must exist prior to this call.
virtual Status NewMemoryMappedFileBuffer(
const std::string& /*fname*/,
unique_ptr<MemoryMappedFileBuffer>* /*result*/) {
return Status::NotSupported(
"MemoryMappedFileBuffer is not implemented in this Env");
}
// Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory
// and create a new Directory object.
@ -804,6 +815,17 @@ class RandomRWFile {
RandomRWFile& operator=(const RandomRWFile&) = delete;
};
// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer.
// Subclasses should release the mapping upon destruction.
struct MemoryMappedFileBuffer {
MemoryMappedFileBuffer(void* _base, size_t _length)
: base(_base), length(_length) {}
virtual ~MemoryMappedFileBuffer() = 0;
void* const base;
const size_t length;
};
// Directory object represents collection of files and implements
// filesystem operations that can be executed on directories.
class Directory {

@ -16,14 +16,19 @@ import argparse
# for simple:
# simple_default_params < blackbox|whitebox_simple_default_params < args
expected_values_file = tempfile.NamedTemporaryFile()
default_params = {
"acquire_snapshot_one_in": 10000,
"block_size": 16384,
"cache_size": 1048576,
"clear_column_family_one_in": 0,
"compression_type": "snappy",
"use_clock_cache": "false",
"delpercent": 5,
"destroy_db_initially": 0,
"disable_wal": 0,
"expected_values_path": expected_values_file.name,
"allow_concurrent_memtable_write": 0,
"iterpercent": 10,
"max_background_compactions": 20,
@ -88,10 +93,13 @@ simple_default_params = {
"block_size": 16384,
"cache_size": 1048576,
"use_clock_cache": "false",
"clear_column_family_one_in": 0,
"column_families": 1,
"compression_type": "snappy",
"delpercent": 5,
"destroy_db_initially": 0,
"disable_wal": 0,
"expected_values_path": expected_values_file.name,
"allow_concurrent_memtable_write": lambda: random.randint(0, 1),
"iterpercent": 10,
"max_background_compactions": 1,
@ -190,13 +198,16 @@ def blackbox_crash_main(args):
+ "threads=" + str(cmd_params['threads']) + "\n"
+ "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n"
+ "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n"
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n")
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n"
+ "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n")
while time.time() < exit_time:
run_had_errors = False
killtime = time.time() + cmd_params['interval']
cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items()))
cmd = gen_cmd(dict(
cmd_params.items() +
{'db': dbname}.items()))
child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
print("Running db_stress with pid=%d: %s\n\n"
@ -253,7 +264,8 @@ def whitebox_crash_main(args):
+ "threads=" + str(cmd_params['threads']) + "\n"
+ "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n"
+ "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n"
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n")
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n"
+ "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n")
total_check_mode = 4
check_mode = 0

@ -297,6 +297,14 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter"
DEFINE_string(db, "", "Use the db with the following name.");
DEFINE_string(
expected_values_path, "",
"File where the array of expected uint32_t values will be stored. If "
"provided and non-empty, the DB state will be verified against these "
"values after recovery. --max_key and --column_family must be kept the "
"same across invocations of this program that use the same "
"--expected_values_path.");
DEFINE_bool(verify_checksum, false,
"Verify checksum for every block read from storage");
@ -768,7 +776,11 @@ class Stats {
// State shared by all concurrent executions of the same benchmark.
class SharedState {
public:
static const uint32_t SENTINEL;
// indicates a key may have any value (or not be present) as an operation on
// it is incomplete.
static const uint32_t UNKNOWN_SENTINEL;
// indicates a key should definitely be deleted
static const uint32_t DELETION_SENTINEL;
explicit SharedState(StressTest* stress_test)
: cv_(&mu_),
@ -786,7 +798,8 @@ class SharedState {
bg_thread_finished_(false),
stress_test_(stress_test),
verification_failure_(false),
no_overwrite_ids_(FLAGS_column_families) {
no_overwrite_ids_(FLAGS_column_families),
values_(nullptr) {
// Pick random keys in each column family that will not experience
// overwrite
@ -820,15 +833,69 @@ class SharedState {
}
delete[] permutation;
size_t expected_values_size =
sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
bool values_init_needed = false;
Status status;
if (!FLAGS_expected_values_path.empty()) {
if (!std::atomic<uint32_t>{}.is_lock_free()) {
status = Status::InvalidArgument(
"Cannot use --expected_values_path on platforms without lock-free "
"std::atomic<uint32_t>");
}
if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
status = Status::InvalidArgument(
"Cannot use --expected_values_path on when "
"--clear_column_family_one_in is greater than zero.");
}
size_t size;
if (status.ok()) {
status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size);
}
unique_ptr<WritableFile> wfile;
if (status.ok() && size == 0) {
const EnvOptions soptions;
status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile,
soptions);
}
if (status.ok() && size == 0) {
std::string buf(expected_values_size, '\0');
status = wfile->Append(buf);
values_init_needed = true;
}
if (status.ok()) {
status = FLAGS_env->NewMemoryMappedFileBuffer(
FLAGS_expected_values_path, &expected_mmap_buffer_);
}
if (status.ok()) {
assert(expected_mmap_buffer_->length == expected_values_size);
values_ =
static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->base);
assert(values_ != nullptr);
} else {
fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
FLAGS_expected_values_path.c_str(), status.ToString().c_str());
assert(values_ == nullptr);
}
}
if (values_ == nullptr) {
values_ =
static_cast<std::atomic<uint32_t>*>(malloc(expected_values_size));
values_init_needed = true;
}
assert(values_ != nullptr);
if (values_init_needed) {
for (int i = 0; i < FLAGS_column_families; ++i) {
for (int j = 0; j < max_key_; ++j) {
Delete(i, j, false /* pending */);
}
}
}
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
return;
}
values_.resize(FLAGS_column_families);
for (int i = 0; i < FLAGS_column_families; ++i) {
values_[i] = std::vector<uint32_t>(max_key_, SENTINEL);
}
long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
@ -935,27 +1002,57 @@ class SharedState {
}
}
std::atomic<uint32_t>& Value(int cf, int64_t key) const {
return values_[cf * max_key_ + key];
}
void ClearColumnFamily(int cf) {
std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL);
std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
DELETION_SENTINEL);
}
void Put(int cf, int64_t key, uint32_t value_base) {
values_[cf][key] = value_base;
// @param pending True if the update may have started but is not yet
// guaranteed finished. This is useful for crash-recovery testing when the
// process may crash before updating the expected values array.
void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
if (!pending) {
// prevent expected-value update from reordering before Write
std::atomic_thread_fence(std::memory_order_release);
}
Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
std::memory_order_relaxed);
if (pending) {
// prevent Write from reordering before expected-value update
std::atomic_thread_fence(std::memory_order_release);
}
}
uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; }
uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; }
// @param pending See comment above Put()
// Returns true if the key was not yet deleted.
bool Delete(int cf, int64_t key, bool pending) {
if (Value(cf, key) == DELETION_SENTINEL) {
return false;
}
Put(cf, key, DELETION_SENTINEL, pending);
return true;
}
void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; }
// @param pending See comment above Put()
// Returns true if the key was not yet deleted.
bool SingleDelete(int cf, int64_t key, bool pending) {
return Delete(cf, key, pending);
}
int DeleteRange(int cf, int64_t begin_key, int64_t end_key) {
// @param pending See comment above Put()
// Returns number of keys deleted by the call.
int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
int covered = 0;
for (int64_t key = begin_key; key < end_key; ++key) {
if (values_[cf][key] != SENTINEL) {
if (Delete(cf, key, pending)) {
++covered;
}
values_[cf][key] = SENTINEL;
}
return covered;
}
@ -964,7 +1061,11 @@ class SharedState {
return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end();
}
bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; }
bool Exists(int cf, int64_t key) {
uint32_t expected_value = Value(cf, key).load();
assert(expected_value != UNKNOWN_SENTINEL);
return expected_value != DELETION_SENTINEL;
}
uint32_t GetSeed() const { return seed_; }
@ -976,6 +1077,10 @@ class SharedState {
bool BgThreadFinished() const { return bg_thread_finished_; }
bool ShouldVerifyAtBeginning() const {
return expected_mmap_buffer_.get() != nullptr;
}
private:
port::Mutex mu_;
port::CondVar cv_;
@ -997,13 +1102,15 @@ class SharedState {
// Keys that should not be overwritten
std::vector<std::unordered_set<size_t> > no_overwrite_ids_;
std::vector<std::vector<uint32_t>> values_;
std::atomic<uint32_t>* values_;
// Has to make it owned by a smart ptr as port::Mutex is not copyable
// and storing it in the container may require copying depending on the impl.
std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
};
const uint32_t SharedState::SENTINEL = 0xffffffff;
const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff;
// Per-thread state for concurrent executions of the same benchmark.
struct ThreadState {
@ -1311,6 +1418,13 @@ class StressTest {
while (!shared.AllInitialized()) {
shared.GetCondVar()->Wait();
}
if (shared.ShouldVerifyAtBeginning()) {
if (shared.HasVerificationFailedYet()) {
printf("Crash-recovery verification failed :(\n");
} else {
printf("Crash-recovery verification passed :)\n");
}
}
auto now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
@ -1406,6 +1520,9 @@ class StressTest {
ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared;
if (shared->ShouldVerifyAtBeginning()) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{
MutexLock l(shared->GetMutex());
shared->IncInitialized();
@ -1987,7 +2104,7 @@ class StressTest {
}
} else if (prefixBound <= prob_op && prob_op < writeBound) {
// OPERATION write
uint32_t value_base = thread->rand.Next();
uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
if (!FLAGS_test_batches_snapshots) {
@ -2015,7 +2132,8 @@ class StressTest {
break;
}
}
shared->Put(rand_column_family, rand_key, value_base);
shared->Put(rand_column_family, rand_key, value_base,
true /* pending */);
Status s;
if (FLAGS_use_merge) {
if (!FLAGS_use_txn) {
@ -2048,6 +2166,8 @@ class StressTest {
#endif
}
}
shared->Put(rand_column_family, rand_key, value_base,
false /* pending */);
if (!s.ok()) {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
std::terminate();
@ -2079,7 +2199,7 @@ class StressTest {
// Use delete if the key may be overwritten and a single deletion
// otherwise.
if (shared->AllowsOverwrite(rand_column_family, rand_key)) {
shared->Delete(rand_column_family, rand_key);
shared->Delete(rand_column_family, rand_key, true /* pending */);
Status s;
if (!FLAGS_use_txn) {
s = db_->Delete(write_opts, column_family, key);
@ -2095,13 +2215,15 @@ class StressTest {
}
#endif
}
shared->Delete(rand_column_family, rand_key, false /* pending */);
thread->stats.AddDeletes(1);
if (!s.ok()) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate();
}
} else {
shared->SingleDelete(rand_column_family, rand_key);
shared->SingleDelete(rand_column_family, rand_key,
true /* pending */);
Status s;
if (!FLAGS_use_txn) {
s = db_->SingleDelete(write_opts, column_family, key);
@ -2117,6 +2239,8 @@ class StressTest {
}
#endif
}
shared->SingleDelete(rand_column_family, rand_key,
false /* pending */);
thread->stats.AddSingleDeletes(1);
if (!s.ok()) {
fprintf(stderr, "single delete error: %s\n",
@ -2150,21 +2274,24 @@ class StressTest {
shared->GetMutexForKey(rand_column_family, rand_key + j)));
}
}
shared->DeleteRange(rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width,
true /* pending */);
keystr = Key(rand_key);
key = keystr;
column_family = column_families_[rand_column_family];
std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_keystr;
int covered = shared->DeleteRange(
rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width);
Status s = db_->DeleteRange(write_opts, column_family, key, end_key);
if (!s.ok()) {
fprintf(stderr, "delete range error: %s\n",
s.ToString().c_str());
std::terminate();
}
int covered = shared->DeleteRange(
rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width, false /* pending */);
thread->stats.AddRangeDeletions(1);
thread->stats.AddCoveredByRangeDeletions(covered);
}
@ -2276,12 +2403,15 @@ class StressTest {
// compare value_from_db with the value in the shared state
char value[kValueMaxLen];
uint32_t value_base = shared->Get(cf, key);
if (value_base == SharedState::SENTINEL && !strict) {
if (value_base == SharedState::UNKNOWN_SENTINEL) {
return true;
}
if (value_base == SharedState::DELETION_SENTINEL && !strict) {
return true;
}
if (s.ok()) {
if (value_base == SharedState::SENTINEL) {
if (value_base == SharedState::DELETION_SENTINEL) {
VerificationAbort(shared, "Unexpected value found", cf, key);
return false;
}
@ -2296,7 +2426,7 @@ class StressTest {
return false;
}
} else {
if (value_base != SharedState::SENTINEL) {
if (value_base != SharedState::DELETION_SENTINEL) {
VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
return false;
}

Loading…
Cancel
Save