diff --git a/env/env.cc b/env/env.cc index 1943f6ad8..66e293337 100644 --- a/env/env.cc +++ b/env/env.cc @@ -87,6 +87,8 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } +MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {} + Logger::~Logger() {} Status Logger::Close() { diff --git a/env/env_posix.cc b/env/env_posix.cc index 707625f3f..a9895ec78 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -457,6 +457,47 @@ class PosixEnv : public Env { return Status::OK(); } + virtual Status NewMemoryMappedFileBuffer( + const std::string& fname, + unique_ptr* result) override { + int fd = -1; + Status status; + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), O_RDWR, 0644); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + status = + IOError("While open file for raw mmap buffer access", fname, errno); + break; + } + } + uint64_t size; + if (status.ok()) { + status = GetFileSize(fname, &size); + } + void* base; + if (status.ok()) { + base = mmap(nullptr, static_cast(size), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + status = IOError("while mmap file for read", fname, errno); + } + } + if (status.ok()) { + result->reset( + new PosixMemoryMappedFileBuffer(base, static_cast(size))); + } + if (fd >= 0) { + // don't need to keep it open after mmap has been called + close(fd); + } + return status; + } + virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { result->reset(); diff --git a/env/env_test.cc b/env/env_test.cc index 4a87094b5..3e5892437 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -200,6 +200,41 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) { } #endif +TEST_F(EnvPosixTest, MemoryMappedFileBuffer) { + const int kFileBytes = 1 << 15; // 32 KB + std::string expected_data; + std::string fname = test::TmpDir(env_) + "/" + "testfile"; + { + unique_ptr wfile; + const EnvOptions soptions; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + Random rnd(301); + test::RandomString(&rnd, kFileBytes, &expected_data); + ASSERT_OK(wfile->Append(expected_data)); + } + + std::unique_ptr mmap_buffer; + Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer); + // it should be supported at least on linux +#if !defined(OS_LINUX) + if (status.IsNotSupported()) { + fprintf(stderr, + "skipping EnvPosixTest.MemoryMappedFileBuffer due to " + "unsupported Env::NewMemoryMappedFileBuffer\n"); + return; + } +#endif // !defined(OS_LINUX) + + ASSERT_OK(status); + ASSERT_NE(nullptr, mmap_buffer.get()); + ASSERT_NE(nullptr, mmap_buffer->base); + ASSERT_EQ(kFileBytes, mmap_buffer->length); + std::string actual_data(static_cast(mmap_buffer->base), + mmap_buffer->length); + ASSERT_EQ(expected_data, actual_data); +} + TEST_P(EnvPosixTestWithParam, UnSchedule) { std::atomic called(false); env_->SetBackgroundThreads(1, Env::LOW); diff --git a/env/io_posix.cc b/env/io_posix.cc index da6b516c9..a411b5639 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -1052,6 +1052,11 @@ Status PosixRandomRWFile::Close() { return Status::OK(); } +PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { + // TODO should have error handling though not much we can do... + munmap(this->base, length); +} + /* * PosixDirectory */ diff --git a/env/io_posix.h b/env/io_posix.h index f29a159ae..106f6df65 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -236,6 +236,12 @@ class PosixRandomRWFile : public RandomRWFile { int fd_; }; +struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer { + PosixMemoryMappedFileBuffer(void* _base, size_t _length) + : MemoryMappedFileBuffer(_base, _length) {} + virtual ~PosixMemoryMappedFileBuffer(); +}; + class PosixDirectory : public Directory { public: explicit PosixDirectory(int fd) : fd_(fd) {} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 55356a904..212b1770a 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -42,6 +42,7 @@ class SequentialFile; class Slice; class WritableFile; class RandomRWFile; +struct MemoryMappedFileBuffer; class Directory; struct DBOptions; struct ImmutableDBOptions; @@ -204,6 +205,16 @@ class Env { return Status::NotSupported("RandomRWFile is not implemented in this Env"); } + // Opens `fname` as a memory-mapped file for read and write (in-place updates + // only, i.e., no appends). On success, stores a raw buffer covering the whole + // file in `*result`. The file must exist prior to this call. + virtual Status NewMemoryMappedFileBuffer( + const std::string& /*fname*/, + unique_ptr* /*result*/) { + return Status::NotSupported( + "MemoryMappedFileBuffer is not implemented in this Env"); + } + // Create an object that represents a directory. Will fail if directory // doesn't exist. If the directory exists, it will open the directory // and create a new Directory object. @@ -804,6 +815,17 @@ class RandomRWFile { RandomRWFile& operator=(const RandomRWFile&) = delete; }; +// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer. +// Subclasses should release the mapping upon destruction. +struct MemoryMappedFileBuffer { + MemoryMappedFileBuffer(void* _base, size_t _length) + : base(_base), length(_length) {} + virtual ~MemoryMappedFileBuffer() = 0; + + void* const base; + const size_t length; +}; + // Directory object represents collection of files and implements // filesystem operations that can be executed on directories. class Directory { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index ae65904ea..18162f1d6 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -16,14 +16,19 @@ import argparse # for simple: # simple_default_params < blackbox|whitebox_simple_default_params < args +expected_values_file = tempfile.NamedTemporaryFile() + default_params = { "acquire_snapshot_one_in": 10000, "block_size": 16384, "cache_size": 1048576, + "clear_column_family_one_in": 0, + "compression_type": "snappy", "use_clock_cache": "false", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, + "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": 0, "iterpercent": 10, "max_background_compactions": 20, @@ -88,10 +93,13 @@ simple_default_params = { "block_size": 16384, "cache_size": 1048576, "use_clock_cache": "false", + "clear_column_family_one_in": 0, "column_families": 1, + "compression_type": "snappy", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, + "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "iterpercent": 10, "max_background_compactions": 1, @@ -190,13 +198,16 @@ def blackbox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" + + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") while time.time() < exit_time: run_had_errors = False killtime = time.time() + cmd_params['interval'] - cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items())) + cmd = gen_cmd(dict( + cmd_params.items() + + {'db': dbname}.items())) child = subprocess.Popen(cmd, stderr=subprocess.PIPE) print("Running db_stress with pid=%d: %s\n\n" @@ -253,7 +264,8 @@ def whitebox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" + + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") total_check_mode = 4 check_mode = 0 diff --git a/tools/db_stress.cc b/tools/db_stress.cc index b2d6044cc..ecd3a4b64 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -297,6 +297,14 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter" DEFINE_string(db, "", "Use the db with the following name."); +DEFINE_string( + expected_values_path, "", + "File where the array of expected uint32_t values will be stored. If " + "provided and non-empty, the DB state will be verified against these " + "values after recovery. --max_key and --column_family must be kept the " + "same across invocations of this program that use the same " + "--expected_values_path."); + DEFINE_bool(verify_checksum, false, "Verify checksum for every block read from storage"); @@ -768,7 +776,11 @@ class Stats { // State shared by all concurrent executions of the same benchmark. class SharedState { public: - static const uint32_t SENTINEL; + // indicates a key may have any value (or not be present) as an operation on + // it is incomplete. + static const uint32_t UNKNOWN_SENTINEL; + // indicates a key should definitely be deleted + static const uint32_t DELETION_SENTINEL; explicit SharedState(StressTest* stress_test) : cv_(&mu_), @@ -786,7 +798,8 @@ class SharedState { bg_thread_finished_(false), stress_test_(stress_test), verification_failure_(false), - no_overwrite_ids_(FLAGS_column_families) { + no_overwrite_ids_(FLAGS_column_families), + values_(nullptr) { // Pick random keys in each column family that will not experience // overwrite @@ -820,15 +833,69 @@ class SharedState { } delete[] permutation; + size_t expected_values_size = + sizeof(std::atomic) * FLAGS_column_families * max_key_; + bool values_init_needed = false; + Status status; + if (!FLAGS_expected_values_path.empty()) { + if (!std::atomic{}.is_lock_free()) { + status = Status::InvalidArgument( + "Cannot use --expected_values_path on platforms without lock-free " + "std::atomic"); + } + if (status.ok() && FLAGS_clear_column_family_one_in > 0) { + status = Status::InvalidArgument( + "Cannot use --expected_values_path on when " + "--clear_column_family_one_in is greater than zero."); + } + size_t size; + if (status.ok()) { + status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size); + } + unique_ptr wfile; + if (status.ok() && size == 0) { + const EnvOptions soptions; + status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile, + soptions); + } + if (status.ok() && size == 0) { + std::string buf(expected_values_size, '\0'); + status = wfile->Append(buf); + values_init_needed = true; + } + if (status.ok()) { + status = FLAGS_env->NewMemoryMappedFileBuffer( + FLAGS_expected_values_path, &expected_mmap_buffer_); + } + if (status.ok()) { + assert(expected_mmap_buffer_->length == expected_values_size); + values_ = + static_cast*>(expected_mmap_buffer_->base); + assert(values_ != nullptr); + } else { + fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", + FLAGS_expected_values_path.c_str(), status.ToString().c_str()); + assert(values_ == nullptr); + } + } + if (values_ == nullptr) { + values_ = + static_cast*>(malloc(expected_values_size)); + values_init_needed = true; + } + assert(values_ != nullptr); + if (values_init_needed) { + for (int i = 0; i < FLAGS_column_families; ++i) { + for (int j = 0; j < max_key_; ++j) { + Delete(i, j, false /* pending */); + } + } + } + if (FLAGS_test_batches_snapshots) { fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); return; } - values_.resize(FLAGS_column_families); - - for (int i = 0; i < FLAGS_column_families; ++i) { - values_[i] = std::vector(max_key_, SENTINEL); - } long num_locks = static_cast(max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { @@ -935,27 +1002,57 @@ class SharedState { } } + std::atomic& Value(int cf, int64_t key) const { + return values_[cf * max_key_ + key]; + } + void ClearColumnFamily(int cf) { - std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL); + std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), + DELETION_SENTINEL); } - void Put(int cf, int64_t key, uint32_t value_base) { - values_[cf][key] = value_base; + // @param pending True if the update may have started but is not yet + // guaranteed finished. This is useful for crash-recovery testing when the + // process may crash before updating the expected values array. + void Put(int cf, int64_t key, uint32_t value_base, bool pending) { + if (!pending) { + // prevent expected-value update from reordering before Write + std::atomic_thread_fence(std::memory_order_release); + } + Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base, + std::memory_order_relaxed); + if (pending) { + // prevent Write from reordering before expected-value update + std::atomic_thread_fence(std::memory_order_release); + } } - uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; } + uint32_t Get(int cf, int64_t key) const { return Value(cf, key); } - void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } + // @param pending See comment above Put() + // Returns true if the key was not yet deleted. + bool Delete(int cf, int64_t key, bool pending) { + if (Value(cf, key) == DELETION_SENTINEL) { + return false; + } + Put(cf, key, DELETION_SENTINEL, pending); + return true; + } - void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } + // @param pending See comment above Put() + // Returns true if the key was not yet deleted. + bool SingleDelete(int cf, int64_t key, bool pending) { + return Delete(cf, key, pending); + } - int DeleteRange(int cf, int64_t begin_key, int64_t end_key) { + // @param pending See comment above Put() + // Returns number of keys deleted by the call. + int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { int covered = 0; for (int64_t key = begin_key; key < end_key; ++key) { - if (values_[cf][key] != SENTINEL) { + if (Delete(cf, key, pending)) { ++covered; } - values_[cf][key] = SENTINEL; } return covered; } @@ -964,7 +1061,11 @@ class SharedState { return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end(); } - bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; } + bool Exists(int cf, int64_t key) { + uint32_t expected_value = Value(cf, key).load(); + assert(expected_value != UNKNOWN_SENTINEL); + return expected_value != DELETION_SENTINEL; + } uint32_t GetSeed() const { return seed_; } @@ -976,6 +1077,10 @@ class SharedState { bool BgThreadFinished() const { return bg_thread_finished_; } + bool ShouldVerifyAtBeginning() const { + return expected_mmap_buffer_.get() != nullptr; + } + private: port::Mutex mu_; port::CondVar cv_; @@ -997,13 +1102,15 @@ class SharedState { // Keys that should not be overwritten std::vector > no_overwrite_ids_; - std::vector> values_; + std::atomic* values_; // Has to make it owned by a smart ptr as port::Mutex is not copyable // and storing it in the container may require copying depending on the impl. std::vector > > key_locks_; + std::unique_ptr expected_mmap_buffer_; }; -const uint32_t SharedState::SENTINEL = 0xffffffff; +const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; +const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; // Per-thread state for concurrent executions of the same benchmark. struct ThreadState { @@ -1311,6 +1418,13 @@ class StressTest { while (!shared.AllInitialized()) { shared.GetCondVar()->Wait(); } + if (shared.ShouldVerifyAtBeginning()) { + if (shared.HasVerificationFailedYet()) { + printf("Crash-recovery verification failed :(\n"); + } else { + printf("Crash-recovery verification passed :)\n"); + } + } auto now = FLAGS_env->NowMicros(); fprintf(stdout, "%s Starting database operations\n", @@ -1406,6 +1520,9 @@ class StressTest { ThreadState* thread = reinterpret_cast(v); SharedState* shared = thread->shared; + if (shared->ShouldVerifyAtBeginning()) { + thread->shared->GetStressTest()->VerifyDb(thread); + } { MutexLock l(shared->GetMutex()); shared->IncInitialized(); @@ -1987,7 +2104,7 @@ class StressTest { } } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - uint32_t value_base = thread->rand.Next(); + uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); if (!FLAGS_test_batches_snapshots) { @@ -2015,7 +2132,8 @@ class StressTest { break; } } - shared->Put(rand_column_family, rand_key, value_base); + shared->Put(rand_column_family, rand_key, value_base, + true /* pending */); Status s; if (FLAGS_use_merge) { if (!FLAGS_use_txn) { @@ -2048,6 +2166,8 @@ class StressTest { #endif } } + shared->Put(rand_column_family, rand_key, value_base, + false /* pending */); if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); std::terminate(); @@ -2079,7 +2199,7 @@ class StressTest { // Use delete if the key may be overwritten and a single deletion // otherwise. if (shared->AllowsOverwrite(rand_column_family, rand_key)) { - shared->Delete(rand_column_family, rand_key); + shared->Delete(rand_column_family, rand_key, true /* pending */); Status s; if (!FLAGS_use_txn) { s = db_->Delete(write_opts, column_family, key); @@ -2095,13 +2215,15 @@ class StressTest { } #endif } + shared->Delete(rand_column_family, rand_key, false /* pending */); thread->stats.AddDeletes(1); if (!s.ok()) { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); std::terminate(); } } else { - shared->SingleDelete(rand_column_family, rand_key); + shared->SingleDelete(rand_column_family, rand_key, + true /* pending */); Status s; if (!FLAGS_use_txn) { s = db_->SingleDelete(write_opts, column_family, key); @@ -2117,6 +2239,8 @@ class StressTest { } #endif } + shared->SingleDelete(rand_column_family, rand_key, + false /* pending */); thread->stats.AddSingleDeletes(1); if (!s.ok()) { fprintf(stderr, "single delete error: %s\n", @@ -2150,21 +2274,24 @@ class StressTest { shared->GetMutexForKey(rand_column_family, rand_key + j))); } } + shared->DeleteRange(rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, + true /* pending */); keystr = Key(rand_key); key = keystr; column_family = column_families_[rand_column_family]; std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); Slice end_key = end_keystr; - int covered = shared->DeleteRange( - rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width); Status s = db_->DeleteRange(write_opts, column_family, key, end_key); if (!s.ok()) { fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); std::terminate(); } + int covered = shared->DeleteRange( + rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width, false /* pending */); thread->stats.AddRangeDeletions(1); thread->stats.AddCoveredByRangeDeletions(covered); } @@ -2276,12 +2403,15 @@ class StressTest { // compare value_from_db with the value in the shared state char value[kValueMaxLen]; uint32_t value_base = shared->Get(cf, key); - if (value_base == SharedState::SENTINEL && !strict) { + if (value_base == SharedState::UNKNOWN_SENTINEL) { + return true; + } + if (value_base == SharedState::DELETION_SENTINEL && !strict) { return true; } if (s.ok()) { - if (value_base == SharedState::SENTINEL) { + if (value_base == SharedState::DELETION_SENTINEL) { VerificationAbort(shared, "Unexpected value found", cf, key); return false; } @@ -2296,7 +2426,7 @@ class StressTest { return false; } } else { - if (value_base != SharedState::SENTINEL) { + if (value_base != SharedState::DELETION_SENTINEL) { VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); return false; }