// Copyright (c) 2015, Facebook, Inc. All rights reserved. // This source code is licensed under the BSD-style license found in the // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // // Copyright 2014 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. // This test uses a custom Env to keep track of the state of a filesystem as of // the last "sync". It then checks for data loss errors by purposely dropping // file data (or entire files) not protected by a "sync". #include #include #include "db/db_impl.h" #include "db/filename.h" #include "db/log_format.h" #include "db/version_set.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/table.h" #include "rocksdb/write_batch.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/testharness.h" #include "util/testutil.h" namespace rocksdb { static const int kValueSize = 1000; static const int kMaxNumValues = 2000; static const size_t kNumIterations = 3; class TestWritableFile; class FaultInjectionTestEnv; namespace { // Assume a filename, and not a directory name like "/foo/bar/" static std::string GetDirName(const std::string filename) { size_t found = filename.find_last_of("/\\"); if (found == std::string::npos) { return ""; } else { return filename.substr(0, found); } } Status SyncDir(const std::string& dir) { // As this is a test it isn't required to *actually* sync this directory. return Status::OK(); } // A basic file truncation function suitable for this test. Status Truncate(const std::string& filename, uint64_t length) { rocksdb::Env* env = rocksdb::Env::Default(); unique_ptr orig_file; const EnvOptions options; Status s = env->NewSequentialFile(filename, &orig_file, options); if (!s.ok()) return s; char* scratch = new char[length]; rocksdb::Slice result; s = orig_file->Read(length, &result, scratch); if (s.ok()) { std::string tmp_name = GetDirName(filename) + "/truncate.tmp"; unique_ptr tmp_file; s = env->NewWritableFile(tmp_name, &tmp_file, options); if (s.ok()) { s = tmp_file->Append(result); if (s.ok()) { s = env->RenameFile(tmp_name, filename); } else { env->DeleteFile(tmp_name); } } } delete[] scratch; return s; } struct FileState { std::string filename_; ssize_t pos_; ssize_t pos_at_last_sync_; ssize_t pos_at_last_flush_; explicit FileState(const std::string& filename) : filename_(filename), pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) { } FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } Status DropUnsyncedData() const; }; } // anonymous namespace // A wrapper around WritableFile which informs another Env whenever this file // is written to or sync'ed. class TestWritableFile : public WritableFile { public: explicit TestWritableFile(const std::string& fname, unique_ptr&& f, FaultInjectionTestEnv* env); virtual ~TestWritableFile(); virtual Status Append(const Slice& data); virtual Status Close(); virtual Status Flush(); virtual Status Sync(); private: FileState state_; unique_ptr target_; bool writable_file_opened_; FaultInjectionTestEnv* env_; Status SyncParent(); }; class FaultInjectionTestEnv : public EnvWrapper { public: explicit FaultInjectionTestEnv(Env* base) : EnvWrapper(base), filesystem_active_(true) {} virtual ~FaultInjectionTestEnv() { } Status NewWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& soptions) { Status s = target()->NewWritableFile(fname, result, soptions); if (s.ok()) { result->reset(new TestWritableFile(fname, std::move(*result), this)); // WritableFile doesn't append to files, so if the same file is opened // again then it will be truncated - so forget our saved state. UntrackFile(fname); MutexLock l(&mutex_); new_files_since_last_dir_sync_.insert(fname); } return s; } virtual Status DeleteFile(const std::string& f) { Status s = EnvWrapper::DeleteFile(f); ASSERT_OK(s); if (s.ok()) { UntrackFile(f); } return s; } virtual Status RenameFile(const std::string& s, const std::string& t) { Status ret = EnvWrapper::RenameFile(s, t); if (ret.ok()) { MutexLock l(&mutex_); if (db_file_state_.find(s) != db_file_state_.end()) { db_file_state_[t] = db_file_state_[s]; db_file_state_.erase(s); } if (new_files_since_last_dir_sync_.erase(s) != 0) { assert(new_files_since_last_dir_sync_.find(t) == new_files_since_last_dir_sync_.end()); new_files_since_last_dir_sync_.insert(t); } } return ret; } void WritableFileClosed(const FileState& state) { MutexLock l(&mutex_); db_file_state_[state.filename_] = state; } Status DropUnsyncedFileData() { Status s; MutexLock l(&mutex_); for (std::map::const_iterator it = db_file_state_.begin(); s.ok() && it != db_file_state_.end(); ++it) { const FileState& state = it->second; if (!state.IsFullySynced()) { s = state.DropUnsyncedData(); } } return s; } Status DeleteFilesCreatedAfterLastDirSync() { // Because DeleteFile access this container make a copy to avoid deadlock mutex_.Lock(); std::set new_files(new_files_since_last_dir_sync_.begin(), new_files_since_last_dir_sync_.end()); mutex_.Unlock(); Status s; std::set::const_iterator it; for (it = new_files.begin(); s.ok() && it != new_files.end(); ++it) { s = DeleteFile(*it); } return s; } void DirWasSynced() { MutexLock l(&mutex_); new_files_since_last_dir_sync_.clear(); } bool IsFileCreatedSinceLastDirSync(const std::string& filename) { MutexLock l(&mutex_); return new_files_since_last_dir_sync_.find(filename) != new_files_since_last_dir_sync_.end(); } void ResetState() { MutexLock l(&mutex_); db_file_state_.clear(); new_files_since_last_dir_sync_.clear(); SetFilesystemActive(true); } void UntrackFile(const std::string& f) { MutexLock l(&mutex_); db_file_state_.erase(f); new_files_since_last_dir_sync_.erase(f); } // Setting the filesystem to inactive is the test equivalent to simulating a // system reset. Setting to inactive will freeze our saved filesystem state so // that it will stop being recorded. It can then be reset back to the state at // the time of the reset. bool IsFilesystemActive() const { return filesystem_active_; } void SetFilesystemActive(bool active) { filesystem_active_ = active; } private: port::Mutex mutex_; std::map db_file_state_; std::set new_files_since_last_dir_sync_; bool filesystem_active_; // Record flushes, syncs, writes }; Status FileState::DropUnsyncedData() const { ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; return Truncate(filename_, sync_pos); } TestWritableFile::TestWritableFile(const std::string& fname, unique_ptr&& f, FaultInjectionTestEnv* env) : state_(fname), target_(std::move(f)), writable_file_opened_(true), env_(env) { assert(target_ != nullptr); state_.pos_ = 0; } TestWritableFile::~TestWritableFile() { if (writable_file_opened_) { Close(); } } Status TestWritableFile::Append(const Slice& data) { Status s = target_->Append(data); if (s.ok() && env_->IsFilesystemActive()) { state_.pos_ += data.size(); } return s; } Status TestWritableFile::Close() { writable_file_opened_ = false; Status s = target_->Close(); if (s.ok()) { env_->WritableFileClosed(state_); } return s; } Status TestWritableFile::Flush() { Status s = target_->Flush(); if (s.ok() && env_->IsFilesystemActive()) { state_.pos_at_last_flush_ = state_.pos_; } return s; } Status TestWritableFile::Sync() { if (!env_->IsFilesystemActive()) { return Status::OK(); } // Ensure new files referred to by the manifest are in the filesystem. Status s = target_->Sync(); if (s.ok()) { state_.pos_at_last_sync_ = state_.pos_; } if (env_->IsFileCreatedSinceLastDirSync(state_.filename_)) { Status ps = SyncParent(); if (s.ok() && !ps.ok()) { s = ps; } } return s; } Status TestWritableFile::SyncParent() { Status s = SyncDir(GetDirName(state_.filename_)); if (s.ok()) { env_->DirWasSynced(); } return s; } class FaultInjectionTest { public: enum ExpectedVerifResult { VAL_EXPECT_NO_ERROR, VAL_EXPECT_ERROR }; enum ResetMethod { RESET_DROP_UNSYNCED_DATA, RESET_DELETE_UNSYNCED_FILES }; FaultInjectionTestEnv* env_; std::string dbname_; shared_ptr tiny_cache_; Options options_; DB* db_; FaultInjectionTest() : env_(NULL), db_(NULL) { NewDB(); } ~FaultInjectionTest() { ASSERT_OK(TearDown()); } Status NewDB() { assert(db_ == NULL); assert(tiny_cache_ == nullptr); assert(env_ == NULL); env_ = new FaultInjectionTestEnv(Env::Default()); options_ = Options(); options_.env = env_; options_.paranoid_checks = true; BlockBasedTableOptions table_options; tiny_cache_ = NewLRUCache(100); table_options.block_cache = tiny_cache_; options_.table_factory.reset(NewBlockBasedTableFactory(table_options)); dbname_ = test::TmpDir() + "/fault_test"; options_.create_if_missing = true; Status s = OpenDB(); options_.create_if_missing = false; return s; } Status SetUp() { Status s = TearDown(); if (s.ok()) { s = NewDB(); } return s; } Status TearDown() { CloseDB(); Status s = DestroyDB(dbname_, Options()); delete env_; env_ = NULL; tiny_cache_.reset(); return s; } void Build(int start_idx, int num_vals) { std::string key_space, value_space; WriteBatch batch; for (int i = start_idx; i < start_idx + num_vals; i++) { Slice key = Key(i, &key_space); batch.Clear(); batch.Put(key, Value(i, &value_space)); WriteOptions options; ASSERT_OK(db_->Write(options, &batch)); } } Status ReadValue(int i, std::string* val) const { std::string key_space, value_space; Slice key = Key(i, &key_space); Value(i, &value_space); ReadOptions options; return db_->Get(options, key, val); } Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected) const { std::string val; std::string value_space; Status s; for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) { Value(i, &value_space); s = ReadValue(i, &val); if (expected == VAL_EXPECT_NO_ERROR) { if (s.ok()) { ASSERT_EQ(value_space, val); } } else if (s.ok()) { fprintf(stderr, "Expected an error at %d, but was OK\n", i); s = Status::IOError(dbname_, "Expected value error:"); } else { s = Status::OK(); // An expected error } } return s; } // Return the ith key Slice Key(int i, std::string* storage) const { char buf[100]; snprintf(buf, sizeof(buf), "%016d", i); storage->assign(buf, strlen(buf)); return Slice(*storage); } // Return the value to associate with the specified key Slice Value(int k, std::string* storage) const { Random r(k); return test::RandomString(&r, kValueSize, storage); } Status OpenDB() { delete db_; db_ = NULL; env_->ResetState(); return DB::Open(options_, dbname_, &db_); } void CloseDB() { delete db_; db_ = NULL; } void DeleteAllData() { Iterator* iter = db_->NewIterator(ReadOptions()); WriteOptions options; for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ASSERT_OK(db_->Delete(WriteOptions(), iter->key())); } delete iter; } void ResetDBState(ResetMethod reset_method) { switch (reset_method) { case RESET_DROP_UNSYNCED_DATA: ASSERT_OK(env_->DropUnsyncedFileData()); break; case RESET_DELETE_UNSYNCED_FILES: ASSERT_OK(env_->DeleteFilesCreatedAfterLastDirSync()); break; default: assert(false); } } void PartialCompactTestPreFault(int num_pre_sync, int num_post_sync) { DeleteAllData(); Build(0, num_pre_sync); db_->CompactRange(NULL, NULL); Build(num_pre_sync, num_post_sync); } void PartialCompactTestReopenWithFault(ResetMethod reset_method, int num_pre_sync, int num_post_sync) { env_->SetFilesystemActive(false); CloseDB(); ResetDBState(reset_method); ASSERT_OK(OpenDB()); ASSERT_OK(Verify(0, num_pre_sync, FaultInjectionTest::VAL_EXPECT_NO_ERROR)); ASSERT_OK(Verify(num_pre_sync, num_post_sync, FaultInjectionTest::VAL_EXPECT_ERROR)); } void NoWriteTestPreFault() { } void NoWriteTestReopenWithFault(ResetMethod reset_method) { CloseDB(); ResetDBState(reset_method); ASSERT_OK(OpenDB()); } }; TEST(FaultInjectionTest, FaultTest) { Random rnd(0); ASSERT_OK(SetUp()); for (size_t idx = 0; idx < kNumIterations; idx++) { int num_pre_sync = rnd.Uniform(kMaxNumValues); int num_post_sync = rnd.Uniform(kMaxNumValues); PartialCompactTestPreFault(num_pre_sync, num_post_sync); PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA, num_pre_sync, num_post_sync); NoWriteTestPreFault(); NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA); PartialCompactTestPreFault(num_pre_sync, num_post_sync); // No new files created so we expect all values since no files will be // dropped. PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES, num_pre_sync + num_post_sync, 0); NoWriteTestPreFault(); NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES); } } } // namespace rocksdb int main(int argc, char** argv) { return rocksdb::test::RunAllTests(); }