diff --git a/env/env.cc b/env/env.cc index 66e293337..1943f6ad8 100644 --- a/env/env.cc +++ b/env/env.cc @@ -87,8 +87,6 @@ RandomAccessFile::~RandomAccessFile() { WritableFile::~WritableFile() { } -MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {} - Logger::~Logger() {} Status Logger::Close() { diff --git a/env/env_posix.cc b/env/env_posix.cc index 038d7c44a..707625f3f 100644 --- a/env/env_posix.cc +++ b/env/env_posix.cc @@ -457,47 +457,6 @@ class PosixEnv : public Env { return Status::OK(); } - virtual Status NewMemoryMappedFileBuffer( - const std::string& fname, - unique_ptr* result) override { - int fd = -1; - Status status; - while (fd < 0) { - IOSTATS_TIMER_GUARD(open_nanos); - fd = open(fname.c_str(), O_RDWR, 0644); - if (fd < 0) { - // Error while opening the file - if (errno == EINTR) { - continue; - } - status = - IOError("While open file for raw mmap buffer access", fname, errno); - break; - } - } - uint64_t size; - if (status.ok()) { - status = GetFileSize(fname, &size); - } - void* base = nullptr; - if (status.ok()) { - base = mmap(nullptr, static_cast(size), PROT_READ | PROT_WRITE, - MAP_SHARED, fd, 0); - if (base == MAP_FAILED) { - status = IOError("while mmap file for read", fname, errno); - } - } - if (status.ok()) { - result->reset( - new PosixMemoryMappedFileBuffer(base, static_cast(size))); - } - if (fd >= 0) { - // don't need to keep it open after mmap has been called - close(fd); - } - return status; - } - virtual Status NewDirectory(const std::string& name, unique_ptr* result) override { result->reset(); diff --git a/env/env_test.cc b/env/env_test.cc index 3e5892437..4a87094b5 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -200,41 +200,6 @@ TEST_F(EnvPosixTest, DISABLED_FilePermission) { } #endif -TEST_F(EnvPosixTest, MemoryMappedFileBuffer) { - const int kFileBytes = 1 << 15; // 32 KB - std::string expected_data; - std::string fname = test::TmpDir(env_) + "/" + "testfile"; - { - unique_ptr wfile; - const EnvOptions soptions; - ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); - - Random rnd(301); - test::RandomString(&rnd, kFileBytes, &expected_data); - ASSERT_OK(wfile->Append(expected_data)); - } - - std::unique_ptr mmap_buffer; - Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer); - // it should be supported at least on linux -#if !defined(OS_LINUX) - if (status.IsNotSupported()) { - fprintf(stderr, - "skipping EnvPosixTest.MemoryMappedFileBuffer due to " - "unsupported Env::NewMemoryMappedFileBuffer\n"); - return; - } -#endif // !defined(OS_LINUX) - - ASSERT_OK(status); - ASSERT_NE(nullptr, mmap_buffer.get()); - ASSERT_NE(nullptr, mmap_buffer->base); - ASSERT_EQ(kFileBytes, mmap_buffer->length); - std::string actual_data(static_cast(mmap_buffer->base), - mmap_buffer->length); - ASSERT_EQ(expected_data, actual_data); -} - TEST_P(EnvPosixTestWithParam, UnSchedule) { std::atomic called(false); env_->SetBackgroundThreads(1, Env::LOW); diff --git a/env/io_posix.cc b/env/io_posix.cc index a411b5639..da6b516c9 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -1052,11 +1052,6 @@ Status PosixRandomRWFile::Close() { return Status::OK(); } -PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { - // TODO should have error handling though not much we can do... - munmap(this->base, length); -} - /* * PosixDirectory */ diff --git a/env/io_posix.h b/env/io_posix.h index 106f6df65..f29a159ae 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -236,12 +236,6 @@ class PosixRandomRWFile : public RandomRWFile { int fd_; }; -struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer { - PosixMemoryMappedFileBuffer(void* _base, size_t _length) - : MemoryMappedFileBuffer(_base, _length) {} - virtual ~PosixMemoryMappedFileBuffer(); -}; - class PosixDirectory : public Directory { public: explicit PosixDirectory(int fd) : fd_(fd) {} diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 1f1f06010..81b31bdbb 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -42,7 +42,6 @@ class SequentialFile; class Slice; class WritableFile; class RandomRWFile; -struct MemoryMappedFileBuffer; class Directory; struct DBOptions; struct ImmutableDBOptions; @@ -205,16 +204,6 @@ class Env { return Status::NotSupported("RandomRWFile is not implemented in this Env"); } - // Opens `fname` as a memory-mapped file for read and write (in-place updates - // only, i.e., no appends). On success, stores a raw buffer covering the whole - // file in `*result`. The file must exist prior to this call. - virtual Status NewMemoryMappedFileBuffer( - const std::string& /*fname*/, - unique_ptr* /*result*/) { - return Status::NotSupported( - "MemoryMappedFileBuffer is not implemented in this Env"); - } - // Create an object that represents a directory. Will fail if directory // doesn't exist. If the directory exists, it will open the directory // and create a new Directory object. @@ -820,17 +809,6 @@ class RandomRWFile { RandomRWFile& operator=(const RandomRWFile&) = delete; }; -// MemoryMappedFileBuffer object represents a memory-mapped file's raw buffer. -// Subclasses should release the mapping upon destruction. -struct MemoryMappedFileBuffer { - MemoryMappedFileBuffer(void* _base, size_t _length) - : base(_base), length(_length) {} - virtual ~MemoryMappedFileBuffer() = 0; - - void* const base; - const size_t length; -}; - // Directory object represents collection of files and implements // filesystem operations that can be executed on directories. class Directory { diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 6d24d3f19..01a36a6ac 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -16,19 +16,14 @@ import argparse # for simple: # simple_default_params < blackbox|whitebox_simple_default_params < args -expected_values_file = tempfile.NamedTemporaryFile() - default_params = { "acquire_snapshot_one_in": 10000, "block_size": 16384, "cache_size": 1048576, - "clear_column_family_one_in": 0, - "compression_type": "snappy", "use_clock_cache": "false", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, - "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": 0, "iterpercent": 10, "max_background_compactions": 20, @@ -94,13 +89,10 @@ simple_default_params = { "block_size": 16384, "cache_size": 1048576, "use_clock_cache": "false", - "clear_column_family_one_in": 0, "column_families": 1, - "compression_type": "snappy", "delpercent": 5, "destroy_db_initially": 0, "disable_wal": 0, - "expected_values_path": expected_values_file.name, "allow_concurrent_memtable_write": lambda: random.randint(0, 1), "iterpercent": 10, "max_background_compactions": 1, @@ -200,16 +192,13 @@ def blackbox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" - + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") while time.time() < exit_time: run_had_errors = False killtime = time.time() + cmd_params['interval'] - cmd = gen_cmd(dict( - cmd_params.items() + - {'db': dbname}.items())) + cmd = gen_cmd(dict(cmd_params.items() + {'db': dbname}.items())) child = subprocess.Popen(cmd, stderr=subprocess.PIPE) print("Running db_stress with pid=%d: %s\n\n" @@ -266,8 +255,7 @@ def whitebox_crash_main(args): + "threads=" + str(cmd_params['threads']) + "\n" + "ops_per_thread=" + str(cmd_params['ops_per_thread']) + "\n" + "write_buffer_size=" + str(cmd_params['write_buffer_size']) + "\n" - + "subcompactions=" + str(cmd_params['subcompactions']) + "\n" - + "expected_values_path=" + str(cmd_params['expected_values_path']) + "\n") + + "subcompactions=" + str(cmd_params['subcompactions']) + "\n") total_check_mode = 4 check_mode = 0 diff --git a/tools/db_stress.cc b/tools/db_stress.cc index b701c89f2..c88e4ba70 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -306,14 +306,6 @@ DEFINE_bool(use_block_based_filter, false, "use block based filter" DEFINE_string(db, "", "Use the db with the following name."); -DEFINE_string( - expected_values_path, "", - "File where the array of expected uint32_t values will be stored. If " - "provided and non-empty, the DB state will be verified against these " - "values after recovery. --max_key and --column_family must be kept the " - "same across invocations of this program that use the same " - "--expected_values_path."); - DEFINE_bool(verify_checksum, false, "Verify checksum for every block read from storage"); @@ -785,11 +777,7 @@ class Stats { // State shared by all concurrent executions of the same benchmark. class SharedState { public: - // indicates a key may have any value (or not be present) as an operation on - // it is incomplete. - static const uint32_t UNKNOWN_SENTINEL; - // indicates a key should definitely be deleted - static const uint32_t DELETION_SENTINEL; + static const uint32_t SENTINEL; explicit SharedState(StressTest* stress_test) : cv_(&mu_), @@ -807,8 +795,7 @@ class SharedState { bg_thread_finished_(false), stress_test_(stress_test), verification_failure_(false), - no_overwrite_ids_(FLAGS_column_families), - values_(nullptr) { + no_overwrite_ids_(FLAGS_column_families) { // Pick random keys in each column family that will not experience // overwrite @@ -842,69 +829,15 @@ class SharedState { } delete[] permutation; - size_t expected_values_size = - sizeof(std::atomic) * FLAGS_column_families * max_key_; - bool values_init_needed = false; - Status status; - if (!FLAGS_expected_values_path.empty()) { - if (!std::atomic{}.is_lock_free()) { - status = Status::InvalidArgument( - "Cannot use --expected_values_path on platforms without lock-free " - "std::atomic"); - } - if (status.ok() && FLAGS_clear_column_family_one_in > 0) { - status = Status::InvalidArgument( - "Cannot use --expected_values_path on when " - "--clear_column_family_one_in is greater than zero."); - } - size_t size; - if (status.ok()) { - status = FLAGS_env->GetFileSize(FLAGS_expected_values_path, &size); - } - unique_ptr wfile; - if (status.ok() && size == 0) { - const EnvOptions soptions; - status = FLAGS_env->NewWritableFile(FLAGS_expected_values_path, &wfile, - soptions); - } - if (status.ok() && size == 0) { - std::string buf(expected_values_size, '\0'); - status = wfile->Append(buf); - values_init_needed = true; - } - if (status.ok()) { - status = FLAGS_env->NewMemoryMappedFileBuffer( - FLAGS_expected_values_path, &expected_mmap_buffer_); - } - if (status.ok()) { - assert(expected_mmap_buffer_->length == expected_values_size); - values_ = - static_cast*>(expected_mmap_buffer_->base); - assert(values_ != nullptr); - } else { - fprintf(stderr, "Failed opening shared file '%s' with error: %s\n", - FLAGS_expected_values_path.c_str(), status.ToString().c_str()); - assert(values_ == nullptr); - } - } - if (values_ == nullptr) { - values_ = - static_cast*>(malloc(expected_values_size)); - values_init_needed = true; - } - assert(values_ != nullptr); - if (values_init_needed) { - for (int i = 0; i < FLAGS_column_families; ++i) { - for (int j = 0; j < max_key_; ++j) { - Delete(i, j, false /* pending */); - } - } - } - if (FLAGS_test_batches_snapshots) { fprintf(stdout, "No lock creation because test_batches_snapshots set\n"); return; } + values_.resize(FLAGS_column_families); + + for (int i = 0; i < FLAGS_column_families; ++i) { + values_[i] = std::vector(max_key_, SENTINEL); + } long num_locks = static_cast(max_key_ >> log2_keys_per_lock_); if (max_key_ & ((1 << log2_keys_per_lock_) - 1)) { @@ -1011,57 +944,27 @@ class SharedState { } } - std::atomic& Value(int cf, int64_t key) const { - return values_[cf * max_key_ + key]; - } - void ClearColumnFamily(int cf) { - std::fill(&Value(cf, 0 /* key */), &Value(cf + 1, 0 /* key */), - DELETION_SENTINEL); + std::fill(values_[cf].begin(), values_[cf].end(), SENTINEL); } - // @param pending True if the update may have started but is not yet - // guaranteed finished. This is useful for crash-recovery testing when the - // process may crash before updating the expected values array. - void Put(int cf, int64_t key, uint32_t value_base, bool pending) { - if (!pending) { - // prevent expected-value update from reordering before Write - std::atomic_thread_fence(std::memory_order_release); - } - Value(cf, key).store(pending ? UNKNOWN_SENTINEL : value_base, - std::memory_order_relaxed); - if (pending) { - // prevent Write from reordering before expected-value update - std::atomic_thread_fence(std::memory_order_release); - } + void Put(int cf, int64_t key, uint32_t value_base) { + values_[cf][key] = value_base; } - uint32_t Get(int cf, int64_t key) const { return Value(cf, key); } + uint32_t Get(int cf, int64_t key) const { return values_[cf][key]; } - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. - bool Delete(int cf, int64_t key, bool pending) { - if (Value(cf, key) == DELETION_SENTINEL) { - return false; - } - Put(cf, key, DELETION_SENTINEL, pending); - return true; - } + void Delete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } - // @param pending See comment above Put() - // Returns true if the key was not yet deleted. - bool SingleDelete(int cf, int64_t key, bool pending) { - return Delete(cf, key, pending); - } + void SingleDelete(int cf, int64_t key) { values_[cf][key] = SENTINEL; } - // @param pending See comment above Put() - // Returns number of keys deleted by the call. - int DeleteRange(int cf, int64_t begin_key, int64_t end_key, bool pending) { + int DeleteRange(int cf, int64_t begin_key, int64_t end_key) { int covered = 0; for (int64_t key = begin_key; key < end_key; ++key) { - if (Delete(cf, key, pending)) { + if (values_[cf][key] != SENTINEL) { ++covered; } + values_[cf][key] = SENTINEL; } return covered; } @@ -1070,11 +973,7 @@ class SharedState { return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end(); } - bool Exists(int cf, int64_t key) { - uint32_t expected_value = Value(cf, key).load(); - assert(expected_value != UNKNOWN_SENTINEL); - return expected_value != DELETION_SENTINEL; - } + bool Exists(int cf, int64_t key) { return values_[cf][key] != SENTINEL; } uint32_t GetSeed() const { return seed_; } @@ -1086,10 +985,6 @@ class SharedState { bool BgThreadFinished() const { return bg_thread_finished_; } - bool ShouldVerifyAtBeginning() const { - return expected_mmap_buffer_.get() != nullptr; - } - private: port::Mutex mu_; port::CondVar cv_; @@ -1111,15 +1006,13 @@ class SharedState { // Keys that should not be overwritten std::vector > no_overwrite_ids_; - std::atomic* values_; + std::vector> values_; // Has to make it owned by a smart ptr as port::Mutex is not copyable // and storing it in the container may require copying depending on the impl. std::vector > > key_locks_; - std::unique_ptr expected_mmap_buffer_; }; -const uint32_t SharedState::UNKNOWN_SENTINEL = 0xfffffffe; -const uint32_t SharedState::DELETION_SENTINEL = 0xffffffff; +const uint32_t SharedState::SENTINEL = 0xffffffff; // Per-thread state for concurrent executions of the same benchmark. struct ThreadState { @@ -1427,13 +1320,6 @@ class StressTest { while (!shared.AllInitialized()) { shared.GetCondVar()->Wait(); } - if (shared.ShouldVerifyAtBeginning()) { - if (shared.HasVerificationFailedYet()) { - printf("Crash-recovery verification failed :(\n"); - } else { - printf("Crash-recovery verification passed :)\n"); - } - } auto now = FLAGS_env->NowMicros(); fprintf(stdout, "%s Starting database operations\n", @@ -1529,9 +1415,6 @@ class StressTest { ThreadState* thread = reinterpret_cast(v); SharedState* shared = thread->shared; - if (shared->ShouldVerifyAtBeginning()) { - thread->shared->GetStressTest()->VerifyDb(thread); - } { MutexLock l(shared->GetMutex()); shared->IncInitialized(); @@ -2113,7 +1996,7 @@ class StressTest { } } else if (prefixBound <= prob_op && prob_op < writeBound) { // OPERATION write - uint32_t value_base = thread->rand.Next() % shared->UNKNOWN_SENTINEL; + uint32_t value_base = thread->rand.Next(); size_t sz = GenerateValue(value_base, value, sizeof(value)); Slice v(value, sz); if (!FLAGS_test_batches_snapshots) { @@ -2141,8 +2024,7 @@ class StressTest { break; } } - shared->Put(rand_column_family, rand_key, value_base, - true /* pending */); + shared->Put(rand_column_family, rand_key, value_base); Status s; if (FLAGS_use_merge) { if (!FLAGS_use_txn) { @@ -2175,8 +2057,6 @@ class StressTest { #endif } } - shared->Put(rand_column_family, rand_key, value_base, - false /* pending */); if (!s.ok()) { fprintf(stderr, "put or merge error: %s\n", s.ToString().c_str()); std::terminate(); @@ -2208,7 +2088,7 @@ class StressTest { // Use delete if the key may be overwritten and a single deletion // otherwise. if (shared->AllowsOverwrite(rand_column_family, rand_key)) { - shared->Delete(rand_column_family, rand_key, true /* pending */); + shared->Delete(rand_column_family, rand_key); Status s; if (!FLAGS_use_txn) { s = db_->Delete(write_opts, column_family, key); @@ -2224,15 +2104,13 @@ class StressTest { } #endif } - shared->Delete(rand_column_family, rand_key, false /* pending */); thread->stats.AddDeletes(1); if (!s.ok()) { fprintf(stderr, "delete error: %s\n", s.ToString().c_str()); std::terminate(); } } else { - shared->SingleDelete(rand_column_family, rand_key, - true /* pending */); + shared->SingleDelete(rand_column_family, rand_key); Status s; if (!FLAGS_use_txn) { s = db_->SingleDelete(write_opts, column_family, key); @@ -2248,8 +2126,6 @@ class StressTest { } #endif } - shared->SingleDelete(rand_column_family, rand_key, - false /* pending */); thread->stats.AddSingleDeletes(1); if (!s.ok()) { fprintf(stderr, "single delete error: %s\n", @@ -2283,24 +2159,21 @@ class StressTest { shared->GetMutexForKey(rand_column_family, rand_key + j))); } } - shared->DeleteRange(rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width, - true /* pending */); keystr = Key(rand_key); key = keystr; column_family = column_families_[rand_column_family]; std::string end_keystr = Key(rand_key + FLAGS_range_deletion_width); Slice end_key = end_keystr; + int covered = shared->DeleteRange( + rand_column_family, rand_key, + rand_key + FLAGS_range_deletion_width); Status s = db_->DeleteRange(write_opts, column_family, key, end_key); if (!s.ok()) { fprintf(stderr, "delete range error: %s\n", s.ToString().c_str()); std::terminate(); } - int covered = shared->DeleteRange( - rand_column_family, rand_key, - rand_key + FLAGS_range_deletion_width, false /* pending */); thread->stats.AddRangeDeletions(1); thread->stats.AddCoveredByRangeDeletions(covered); } @@ -2412,15 +2285,12 @@ class StressTest { // compare value_from_db with the value in the shared state char value[kValueMaxLen]; uint32_t value_base = shared->Get(cf, key); - if (value_base == SharedState::UNKNOWN_SENTINEL) { - return true; - } - if (value_base == SharedState::DELETION_SENTINEL && !strict) { + if (value_base == SharedState::SENTINEL && !strict) { return true; } if (s.ok()) { - if (value_base == SharedState::DELETION_SENTINEL) { + if (value_base == SharedState::SENTINEL) { VerificationAbort(shared, "Unexpected value found", cf, key); return false; } @@ -2435,7 +2305,7 @@ class StressTest { return false; } } else { - if (value_base != SharedState::DELETION_SENTINEL) { + if (value_base != SharedState::SENTINEL) { VerificationAbort(shared, "Value not found: " + s.ToString(), cf, key); return false; }