Second attempt at db_stress crash-recovery verification

Summary:
- Original commit: a4fb1f8c04
- Revert commit (we reverted as a quick fix to get crash tests passing): 6afe22db2e

This PR includes the contents of the original commit plus two bug fixes, which are:

- In whitebox crash test, only set `--expected_values_path` for `db_stress` runs in the first half of the crash test's duration. In the second half, a fresh DB is created for each `db_stress` run, so we cannot maintain expected state across `db_stress` runs.
- Made `Exists()` return true for `UNKNOWN_SENTINEL` values. I previously had an assert in `Exists()` that value was not `UNKNOWN_SENTINEL`. But it is possible for post-crash-recovery expected values to be `UNKNOWN_SENTINEL` (i.e., if the crash happens in the middle of an update), in which case this assertion would be tripped. The effect of returning true in this case is there may be cases where a `SingleDelete` deletes no data. But if we had returned false, the effect would be calling `SingleDelete` on a key with multiple older versions, which is not supported.
Closes https://github.com/facebook/rocksdb/pull/3793

Differential Revision: D7811671

Pulled By: ajkr

fbshipit-source-id: 67e0295bfb1695ff9674837f2e05bb29c50efc30
main
Andrew Kryczka 7 years ago committed by Facebook Github Bot
parent 282099fc0f
commit 46152d53bf
  1. 2
      env/env.cc
  2. 41
      env/env_posix.cc
  3. 35
      env/env_test.cc
  4. 5
      env/io_posix.cc
  5. 6
      env/io_posix.h
  6. 22
      include/rocksdb/env.h
  7. 19
      tools/db_crashtest.py
  8. 192
      tools/db_stress.cc

2
env/env.cc vendored

@ -87,6 +87,8 @@ RandomAccessFile::~RandomAccessFile() {
WritableFile::~WritableFile() { WritableFile::~WritableFile() {
} }
MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
Logger::~Logger() {} Logger::~Logger() {}
Status Logger::Close() { Status Logger::Close() {

41
env/env_posix.cc vendored

@ -457,6 +457,47 @@ class PosixEnv : public Env {
return Status::OK(); return Status::OK();
} }
virtual Status NewMemoryMappedFileBuffer(
const std::string& fname,
unique_ptr<MemoryMappedFileBuffer>* result) override {
int fd = -1;
Status status;
while (fd < 0) {
IOSTATS_TIMER_GUARD(open_nanos);
fd = open(fname.c_str(), O_RDWR, 0644);
if (fd < 0) {
// Error while opening the file
if (errno == EINTR) {
continue;
}
status =
IOError("While open file for raw mmap buffer access", fname, errno);
break;
}
}
uint64_t size;
if (status.ok()) {
status = GetFileSize(fname, &size);
}
void* base;
if (status.ok()) {
base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
status = IOError("while mmap file for read", fname, errno);
}
}
if (status.ok()) {
result->reset(
new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
}
if (fd >= 0) {
// don't need to keep it open after mmap has been called
close(fd);
}
return status;
}
virtual Status NewDirectory(const std::string& name, virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) override { unique_ptr<Directory>* result) override {
result->reset(); result->reset();

35
env/env_test.cc vendored

@ -200,6 +200,41 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) {
} }
#endif #endif
TEST_F(EnvPosixTest, MemoryMappedFileBuffer) {
const int kFileBytes = 1 << 15; // 32 KB
std::string expected_data;
std::string fname = test::TmpDir(env_) + "/" + "testfile";
{
unique_ptr<WritableFile> wfile;
const EnvOptions soptions;
ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
Random rnd(301);
test::RandomString(&rnd, kFileBytes, &expected_data);
ASSERT_OK(wfile->Append(expected_data));
}
std::unique_ptr<MemoryMappedFileBuffer> mmap_buffer;
Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer);
// it should be supported at least on linux
#if !defined(OS_LINUX)
if (status.IsNotSupported()) {
fprintf(stderr,
"skipping EnvPosixTest.MemoryMappedFileBuffer due to "
"unsupported Env::NewMemoryMappedFileBuffer\n");
return;
}
#endif // !defined(OS_LINUX)
ASSERT_OK(status);
ASSERT_NE(nullptr, mmap_buffer.get());
ASSERT_NE(nullptr, mmap_buffer->base);
ASSERT_EQ(kFileBytes, mmap_buffer->length);
std::string actual_data(static_cast<char*>(mmap_buffer->base),
mmap_buffer->length);
ASSERT_EQ(expected_data, actual_data);
}
TEST_P(EnvPosixTestWithParam, UnSchedule) { TEST_P(EnvPosixTestWithParam, UnSchedule) {
std::atomic<bool> called(false); std::atomic<bool> called(false);
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);

5
env/io_posix.cc vendored

@ -1052,6 +1052,11 @@ Status PosixRandomRWFile::Close() {
return Status::OK(); return Status::OK();
} }
PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
// TODO should have error handling though not much we can do...
munmap(this->base, length);
}
/* /*
* PosixDirectory * PosixDirectory
*/ */

6
env/io_posix.h vendored

@ -236,6 +236,12 @@ class PosixRandomRWFile : public RandomRWFile {
int fd_; int fd_;
}; };
struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
PosixMemoryMappedFileBuffer(void* _base, size_t _length)
: MemoryMappedFileBuffer(_base, _length) {}
virtual ~PosixMemoryMappedFileBuffer();
};
class PosixDirectory : public Directory { class PosixDirectory : public Directory {
public: public:
explicit PosixDirectory(int fd) : fd_(fd) {} explicit PosixDirectory(int fd) : fd_(fd) {}

@ -42,6 +42,7 @@ class SequentialFile;
class Slice; class Slice;
class WritableFile; class WritableFile;
class RandomRWFile; class RandomRWFile;
struct MemoryMappedFileBuffer;
class Directory; class Directory;
struct DBOptions; struct DBOptions;
struct ImmutableDBOptions; struct ImmutableDBOptions;
@ -204,6 +205,16 @@ class Env {
return Status::NotSupported("RandomRWFile is not implemented in this Env"); return Status::NotSupported("RandomRWFile is not implemented in this Env");
} }
// Opens `fname` as a memory-mapped file for read and write (in-place updates
// only, i.e., no appends). On success, stores a raw buffer covering the whole
// file in `*result`. The file must exist prior to this call.
virtual Status NewMemoryMappedFileBuffer(
const std::string& /*fname*/,
unique_ptr<MemoryMappedFileBuffer>* /*result*/) {
return Status::NotSupported(
"MemoryMappedFileBuffer is not implemented in this Env");
}
// Create an object that represents a directory. Will fail if directory // Create an object that represents a directory. Will fail if directory
// doesn't exist. If the directory exists, it will open the directory // doesn't exist. If the directory exists, it will open the directory
// and create a new Directory object. // and create a new Directory object.
@ -809,6 +820,17 @@ class RandomRWFile {
RandomRWFile& operator=(const RandomRWFile&) = delete; RandomRWFile& operator=(const RandomRWFile&) = delete;
}; };
// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer.
// Subclasses should release the mapping upon destruction.
struct MemoryMappedFileBuffer {
MemoryMappedFileBuffer(void* _base, size_t _length)
: base(_base), length(_length) {}
virtual ~MemoryMappedFileBuffer() = 0;
void* const base;
const size_t length;
};
// Directory object represents collection of files and implements // Directory object represents collection of files and implements
// filesystem operations that can be executed on directories. // filesystem operations that can be executed on directories.
class Directory { class Directory {

@ -16,14 +16,19 @@ import argparse
# for simple: # for simple:
# simple_default_params < blackbox|whitebox_simple_default_params < args # simple_default_params < blackbox|whitebox_simple_default_params < args
expected_values_file = tempfile.NamedTemporaryFile()
default_params = { default_params = {
"acquire_snapshot_one_in": 10000, "acquire_snapshot_one_in": 10000,
"block_size": 16384, "block_size": 16384,
"cache_size": 1048576, "cache_size": 1048576,
"clear_column_family_one_in": 0,
"compression_type": "snappy",
"use_clock_cache": "false", "use_clock_cache": "false",
"delpercent": 5, "delpercent": 5,
"destroy_db_initially": 0, "destroy_db_initially": 0,
"disable_wal": 0, "disable_wal": 0,
"expected_values_path": expected_values_file.name,
"allow_concurrent_memtable_write": 0, "allow_concurrent_memtable_write": 0,
"iterpercent": 10, "iterpercent": 10,
"max_background_compactions": 20, "max_background_compactions": 20,
@ -89,10 +94,13 @@ simple_default_params = {
"block_size": 16384, "block_size": 16384,
"cache_size": 1048576, "cache_size": 1048576,
"use_clock_cache": "false", "use_clock_cache": "false",
"clear_column_family_one_in": 0,
"column_families": 1, "column_families": 1,
"compression_type": "snappy",
"delpercent": 5, "delpercent": 5,
"destroy_db_initially": 0, "destroy_db_initially": 0,
"disable_wal": 0, "disable_wal": 0,
"expected_values_path": expected_values_file.name,
"allow_concurrent_memtable_write": lambda: random.randint(0, 1), "allow_concurrent_memtable_write": lambda: random.randint(0, 1),
"iterpercent": 10, "iterpercent": 10,
"max_background_compactions": 1, "max_background_compactions": 1,
@ -192,13 +200,16 @@ def blackbox_crash_main(args):
+ "threads=" + str(cmd_params['threads']) + "\n" + "threads=" + str(cmd_params['threads']) + "\n"
+ "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n"
+ "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n"
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + "subcompactions=" + str(cmd_params['subcompactions']) + "\n"
+ "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n")
while time.time() < exit_time: while time.time() < exit_time:
run_had_errors = False run_had_errors = False
killtime = time.time() + cmd_params['interval'] killtime = time.time() + cmd_params['interval']
cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items())) cmd = gen_cmd(dict(
cmd_params.items() +
{'db': dbname}.items()))
child = subprocess.Popen(cmd, stderr=subprocess.PIPE) child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
print("Running db_stress with pid=%d: %s\n\n" print("Running db_stress with pid=%d: %s\n\n"
@ -255,7 +266,8 @@ def whitebox_crash_main(args):
+ "threads=" + str(cmd_params['threads']) + "\n" + "threads=" + str(cmd_params['threads']) + "\n"
+ "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n"
+ "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n"
+ "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + "subcompactions=" + str(cmd_params['subcompactions']) + "\n"
+ "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n")
total_check_mode = 4 total_check_mode = 4
check_mode = 0 check_mode = 0
@ -360,6 +372,7 @@ def whitebox_crash_main(args):
# we need to clean up after ourselves -- only do this on test # we need to clean up after ourselves -- only do this on test
# success # success
shutil.rmtree(dbname, True) shutil.rmtree(dbname, True)
cmd_params.pop('expected_values_path', None)
check_mode = (check_mode + 1) % total_check_mode check_mode = (check_mode + 1) % total_check_mode
time.sleep(1) # time to stabilize after a kill time.sleep(1) # time to stabilize after a kill

@ -306,6 +306,14 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter"
DEFINE_string(db, "", "Use the db with the following name."); DEFINE_string(db, "", "Use the db with the following name.");
DEFINE_string(
expected_values_path, "",
"File where the array of expected uint32_t values will be stored. If "
"provided and non-empty, the DB state will be verified against these "
"values after recovery. --max_key and --column_family must be kept the "
"same across invocations of this program that use the same "
"--expected_values_path.");
DEFINE_bool(verify_checksum, false, DEFINE_bool(verify_checksum, false,
"Verify checksum for every block read from storage"); "Verify checksum for every block read from storage");
@ -777,7 +785,11 @@ class Stats {
// State shared by all concurrent executions of the same benchmark. // State shared by all concurrent executions of the same benchmark.
class SharedState { class SharedState {
public: public:
static const uint32_t SENTINEL; // indicates a key may have any value (or not be present) as an operation on
// it is incomplete.
static const uint32_t UNKNOWN_SENTINEL;
// indicates a key should definitely be deleted
static const uint32_t DELETION_SENTINEL;
explicit SharedState(StressTest* stress_test) explicit SharedState(StressTest* stress_test)
: cv_(&mu_), : cv_(&mu_),
@ -795,7 +807,8 @@ class SharedState {
bg_thread_finished_(false), bg_thread_finished_(false),
stress_test_(stress_test), stress_test_(stress_test),
verification_failure_(false), verification_failure_(false),
no_overwrite_ids_(FLAGS_column_families) { no_overwrite_ids_(FLAGS_column_families),
values_(nullptr) {
// Pick random keys in each column family that will not experience // Pick random keys in each column family that will not experience
// overwrite // overwrite
@ -829,15 +842,69 @@ class SharedState {
} }
delete[] permutation; delete[] permutation;
size_t expected_values_size =
sizeof(std::atomic<uint32_t>) * FLAGS_column_families * max_key_;
bool values_init_needed = false;
Status status;
if (!FLAGS_expected_values_path.empty()) {
if (!std::atomic<uint32_t>{}.is_lock_free()) {
status = Status::InvalidArgument(
"Cannot use --expected_values_path on platforms without lock-free "
"std::atomic<uint32_t>");
}
if (status.ok() && FLAGS_clear_column_family_one_in > 0) {
status = Status::InvalidArgument(
"Cannot use --expected_values_path on when "
"--clear_column_family_one_in is greater than zero.");
}
size_t size;
if (status.ok()) {
status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size);
}
unique_ptr<WritableFile> wfile;
if (status.ok() && size == 0) {
const EnvOptions soptions;
status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile,
soptions);
}
if (status.ok() && size == 0) {
std::string buf(expected_values_size, '\0');
status = wfile->Append(buf);
values_init_needed = true;
}
if (status.ok()) {
status = FLAGS_env->NewMemoryMappedFileBuffer(
FLAGS_expected_values_path, &expected_mmap_buffer_);
}
if (status.ok()) {
assert(expected_mmap_buffer_->length == expected_values_size);
values_ =
static_cast<std::atomic<uint32_t>*>(expected_mmap_buffer_->base);
assert(values_ != nullptr);
} else {
fprintf(stderr, "Failed opening shared file '%s' with error: %s\n",
FLAGS_expected_values_path.c_str(), status.ToString().c_str());
assert(values_ == nullptr);
}
}
if (values_ == nullptr) {
values_ =
static_cast<std::atomic<uint32_t>*>(malloc(expected_values_size));
values_init_needed = true;
}
assert(values_ != nullptr);
if (values_init_needed) {
for (int i = 0; i < FLAGS_column_families; ++i) {
for (int j = 0; j < max_key_; ++j) {
Delete(i, j, false /* pending */);
}
}
}
if (FLAGS_test_batches_snapshots) { if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
return; return;
} }
values_.resize(FLAGS_column_families);
for (int i = 0; i < FLAGS_column_families; ++i) {
values_[i] = std::vector<uint32_t>(max_key_, SENTINEL);
}
long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_); long num_locks = static_cast<long>(max_key_ >> log2_keys_per_lock_);
if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) {
@ -944,27 +1011,57 @@ class SharedState {
} }
} }
std::atomic<uint32_t>& Value(int cf, int64_t key) const {
return values_[cf * max_key_ + key];
}
void ClearColumnFamily(int cf) { void ClearColumnFamily(int cf) {
std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL); std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */),
DELETION_SENTINEL);
} }
void Put(int cf, int64_t key, uint32_t value_base) { // @param pending True if the update may have started but is not yet
values_[cf][key] = value_base; // guaranteed finished. This is useful for crash-recovery testing when the
// process may crash before updating the expected values array.
void Put(int cf, int64_t key, uint32_t value_base, bool pending) {
if (!pending) {
// prevent expected-value update from reordering before Write
std::atomic_thread_fence(std::memory_order_release);
}
Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base,
std::memory_order_relaxed);
if (pending) {
// prevent Write from reordering before expected-value update
std::atomic_thread_fence(std::memory_order_release);
}
} }
uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; } uint32_t Get(int cf, int64_t key) const { return Value(cf, key); }
void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } // @param pending See comment above Put()
// Returns true if the key was not yet deleted.
bool Delete(int cf, int64_t key, bool pending) {
if (Value(cf, key) == DELETION_SENTINEL) {
return false;
}
Put(cf, key, DELETION_SENTINEL, pending);
return true;
}
void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } // @param pending See comment above Put()
// Returns true if the key was not yet deleted.
bool SingleDelete(int cf, int64_t key, bool pending) {
return Delete(cf, key, pending);
}
int DeleteRange(int cf, int64_t begin_key, int64_t end_key) { // @param pending See comment above Put()
// Returns number of keys deleted by the call.
int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) {
int covered = 0; int covered = 0;
for (int64_t key = begin_key; key < end_key; ++key) { for (int64_t key = begin_key; key < end_key; ++key) {
if (values_[cf][key] != SENTINEL) { if (Delete(cf, key, pending)) {
++covered; ++covered;
} }
values_[cf][key] = SENTINEL;
} }
return covered; return covered;
} }
@ -973,7 +1070,15 @@ class SharedState {
return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end(); return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end();
} }
bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; } bool Exists(int cf, int64_t key) {
// UNKNOWN_SENTINEL counts as exists. That assures a key for which overwrite
// is disallowed can't be accidentally added a second time, in which case
// SingleDelete wouldn't be able to properly delete the key. It does allow
// the case where a SingleDelete might be added which covers nothing, but
// that's not a correctness issue.
uint32_t expected_value = Value(cf, key).load();
return expected_value != DELETION_SENTINEL;
}
uint32_t GetSeed() const { return seed_; } uint32_t GetSeed() const { return seed_; }
@ -985,6 +1090,10 @@ class SharedState {
bool BgThreadFinished() const { return bg_thread_finished_; } bool BgThreadFinished() const { return bg_thread_finished_; }
bool ShouldVerifyAtBeginning() const {
return expected_mmap_buffer_.get() != nullptr;
}
private: private:
port::Mutex mu_; port::Mutex mu_;
port::CondVar cv_; port::CondVar cv_;
@ -1006,13 +1115,15 @@ class SharedState {
// Keys that should not be overwritten // Keys that should not be overwritten
std::vector<std::unordered_set<size_t> > no_overwrite_ids_; std::vector<std::unordered_set<size_t> > no_overwrite_ids_;
std::vector<std::vector<uint32_t>> values_; std::atomic<uint32_t>* values_;
// Has to make it owned by a smart ptr as port::Mutex is not copyable // Has to make it owned by a smart ptr as port::Mutex is not copyable
// and storing it in the container may require copying depending on the impl. // and storing it in the container may require copying depending on the impl.
std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_; std::vector<std::vector<std::unique_ptr<port::Mutex> > > key_locks_;
std::unique_ptr<MemoryMappedFileBuffer> expected_mmap_buffer_;
}; };
const uint32_t SharedState::SENTINEL = 0xffffffff; const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe;
const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff;
// Per-thread state for concurrent executions of the same benchmark. // Per-thread state for concurrent executions of the same benchmark.
struct ThreadState { struct ThreadState {
@ -1320,6 +1431,13 @@ class StressTest {
while (!shared.AllInitialized()) { while (!shared.AllInitialized()) {
shared.GetCondVar()->Wait(); shared.GetCondVar()->Wait();
} }
if (shared.ShouldVerifyAtBeginning()) {
if (shared.HasVerificationFailedYet()) {
printf("Crash-recovery verification failed :(\n");
} else {
printf("Crash-recovery verification passed :)\n");
}
}
auto now = FLAGS_env->NowMicros(); auto now = FLAGS_env->NowMicros();
fprintf(stdout, "%s Starting database operations\n", fprintf(stdout, "%s Starting database operations\n",
@ -1415,6 +1533,9 @@ class StressTest {
ThreadState* thread = reinterpret_cast<ThreadState*>(v); ThreadState* thread = reinterpret_cast<ThreadState*>(v);
SharedState* shared = thread->shared; SharedState* shared = thread->shared;
if (shared->ShouldVerifyAtBeginning()) {
thread->shared->GetStressTest()->VerifyDb(thread);
}
{ {
MutexLock l(shared->GetMutex()); MutexLock l(shared->GetMutex());
shared->IncInitialized(); shared->IncInitialized();
@ -1996,7 +2117,7 @@ class StressTest {
} }
} else if (prefixBound <= prob_op && prob_op < writeBound) { } else if (prefixBound <= prob_op && prob_op < writeBound) {
// OPERATION write // OPERATION write
uint32_t value_base = thread->rand.Next(); uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL;
size_t sz = GenerateValue(value_base, value, sizeof(value)); size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz); Slice v(value, sz);
if (!FLAGS_test_batches_snapshots) { if (!FLAGS_test_batches_snapshots) {
@ -2024,7 +2145,8 @@ class StressTest {
break; break;
} }
} }
shared->Put(rand_column_family, rand_key, value_base); shared->Put(rand_column_family, rand_key, value_base,
true /* pending */);
Status s; Status s;
if (FLAGS_use_merge) { if (FLAGS_use_merge) {
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
@ -2057,6 +2179,8 @@ class StressTest {
#endif #endif
} }
} }
shared->Put(rand_column_family, rand_key, value_base,
false /* pending */);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str());
std::terminate(); std::terminate();
@ -2088,7 +2212,7 @@ class StressTest {
// Use delete if the key may be overwritten and a single deletion // Use delete if the key may be overwritten and a single deletion
// otherwise. // otherwise.
if (shared->AllowsOverwrite(rand_column_family, rand_key)) { if (shared->AllowsOverwrite(rand_column_family, rand_key)) {
shared->Delete(rand_column_family, rand_key); shared->Delete(rand_column_family, rand_key, true /* pending */);
Status s; Status s;
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
s = db_->Delete(write_opts, column_family, key); s = db_->Delete(write_opts, column_family, key);
@ -2104,13 +2228,15 @@ class StressTest {
} }
#endif #endif
} }
shared->Delete(rand_column_family, rand_key, false /* pending */);
thread->stats.AddDeletes(1); thread->stats.AddDeletes(1);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate(); std::terminate();
} }
} else { } else {
shared->SingleDelete(rand_column_family, rand_key); shared->SingleDelete(rand_column_family, rand_key,
true /* pending */);
Status s; Status s;
if (!FLAGS_use_txn) { if (!FLAGS_use_txn) {
s = db_->SingleDelete(write_opts, column_family, key); s = db_->SingleDelete(write_opts, column_family, key);
@ -2126,6 +2252,8 @@ class StressTest {
} }
#endif #endif
} }
shared->SingleDelete(rand_column_family, rand_key,
false /* pending */);
thread->stats.AddSingleDeletes(1); thread->stats.AddSingleDeletes(1);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "single delete error: %s\n", fprintf(stderr, "single delete error: %s\n",
@ -2159,21 +2287,24 @@ class StressTest {
shared->GetMutexForKey(rand_column_family, rand_key + j))); shared->GetMutexForKey(rand_column_family, rand_key + j)));
} }
} }
shared->DeleteRange(rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width,
true /* pending */);
keystr = Key(rand_key); keystr = Key(rand_key);
key = keystr; key = keystr;
column_family = column_families_[rand_column_family]; column_family = column_families_[rand_column_family];
std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_keystr; Slice end_key = end_keystr;
int covered = shared->DeleteRange(
rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width);
Status s = db_->DeleteRange(write_opts, column_family, key, end_key); Status s = db_->DeleteRange(write_opts, column_family, key, end_key);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "delete range error: %s\n", fprintf(stderr, "delete range error: %s\n",
s.ToString().c_str()); s.ToString().c_str());
std::terminate(); std::terminate();
} }
int covered = shared->DeleteRange(
rand_column_family, rand_key,
rand_key + FLAGS_range_deletion_width, false /* pending */);
thread->stats.AddRangeDeletions(1); thread->stats.AddRangeDeletions(1);
thread->stats.AddCoveredByRangeDeletions(covered); thread->stats.AddCoveredByRangeDeletions(covered);
} }
@ -2285,12 +2416,15 @@ class StressTest {
// compare value_from_db with the value in the shared state // compare value_from_db with the value in the shared state
char value[kValueMaxLen]; char value[kValueMaxLen];
uint32_t value_base = shared->Get(cf, key); uint32_t value_base = shared->Get(cf, key);
if (value_base == SharedState::SENTINEL && !strict) { if (value_base == SharedState::UNKNOWN_SENTINEL) {
return true;
}
if (value_base == SharedState::DELETION_SENTINEL && !strict) {
return true; return true;
} }
if (s.ok()) { if (s.ok()) {
if (value_base == SharedState::SENTINEL) { if (value_base == SharedState::DELETION_SENTINEL) {
VerificationAbort(shared, "Unexpected value found", cf, key); VerificationAbort(shared, "Unexpected value found", cf, key);
return false; return false;
} }
@ -2305,7 +2439,7 @@ class StressTest {
return false; return false;
} }
} else { } else {
if (value_base != SharedState::SENTINEL) { if (value_base != SharedState::DELETION_SENTINEL) {
VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key);
return false; return false;
} }

Loading…
Cancel
Save