From 6c0c8dee7b59ed06008e5ea6434c32b86e33071d Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 15 Jul 2015 19:58:28 -0700 Subject: [PATCH] Fix data loss after DB recovery by not allowing flush/compaction to be scheduled until DB opened Summary: Previous run may leave some SST files with higher file numbers than manifest indicates. Compaction or flush may start to run while DB::Open() is still going on. SST file garbage collection may happen interleaving with compaction or flush, and overwrite files generated by compaction of flushes after they are generated. This might cause data loss. This possibility of interleaving is recently introduced. Fix it by not allowing compaction or flush to be scheduled before DB::Open() finishes. Test Plan: Add a unit test. This verification will have a chance to fail without the fix but doesn't fix without the fix. Reviewers: kradhakrishnan, anthony, yhchiang, IslamAbdelRahman, igor Reviewed By: igor Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D42399 --- db/db_impl.cc | 12 ++++- db/fault_injection_test.cc | 102 +++++++++++++++++++++++++++++++------ 2 files changed, 96 insertions(+), 18 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index c626813c3..c4bca30ae 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2059,6 +2059,10 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd) { void DBImpl::MaybeScheduleFlushOrCompaction() { mutex_.AssertHeld(); + if (!opened_successfully_) { + // Compaction may introduce data race to DB open + return; + } if (bg_work_gate_closed_) { // gate closed for background work return; @@ -2609,6 +2613,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, mutex_.Unlock(); status = compaction_job.Run(); + TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun"); mutex_.Lock(); compaction_job.Install(&status, *c->mutable_cf_options(), &mutex_); @@ -4317,11 +4322,14 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, } } } - + TEST_SYNC_POINT("DBImpl::Open:Opened"); + if (s.ok()) { + impl->opened_successfully_ = true; + impl->MaybeScheduleFlushOrCompaction(); + } impl->mutex_.Unlock(); if (s.ok()) { - impl->opened_successfully_ = true; Log(InfoLogLevel::INFO_LEVEL, impl->db_options_.info_log, "DB pointer %p", impl); *dbptr = impl; diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 67bceb6fc..efb2ac353 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -25,6 +25,7 @@ #include "util/logging.h" #include "util/mock_env.h" #include "util/mutexlock.h" +#include "util/sync_point.h" #include "util/testharness.h" #include "util/testutil.h" @@ -186,6 +187,13 @@ class FaultInjectionTestEnv : public EnvWrapper { Status NewWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& soptions) override { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } + // Not allow overwriting files + if (target()->FileExists(fname)) { + return Status::Corruption("File already exists."); + } Status s = target()->NewWritableFile(fname, result, soptions); if (s.ok()) { result->reset(new TestWritableFile(fname, std::move(*result), this)); @@ -202,6 +210,9 @@ class FaultInjectionTestEnv : public EnvWrapper { } virtual Status DeleteFile(const std::string& f) override { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } Status s = EnvWrapper::DeleteFile(f); if (!s.ok()) { fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), @@ -216,6 +227,9 @@ class FaultInjectionTestEnv : public EnvWrapper { virtual Status RenameFile(const std::string& s, const std::string& t) override { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } Status ret = EnvWrapper::RenameFile(s, t); if (ret.ok()) { @@ -374,8 +388,11 @@ TestWritableFile::~TestWritableFile() { } Status TestWritableFile::Append(const Slice& data) { + if (!env_->IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } Status s = target_->Append(data); - if (s.ok() && env_->IsFilesystemActive()) { + if (s.ok()) { state_.pos_ += data.size(); } return s; @@ -545,33 +562,34 @@ class FaultInjectionTest : public testing::Test { ASSERT_OK(s); } - void Build(const WriteOptions& write_options, int start_idx, int num_vals) { + void Build(const WriteOptions& write_options, int start_idx, int num_vals, + bool sequential = true) { std::string key_space, value_space; WriteBatch batch; for (int i = start_idx; i < start_idx + num_vals; i++) { - Slice key = Key(i, &key_space); + Slice key = Key(sequential, i, &key_space); batch.Clear(); batch.Put(key, Value(i, &value_space)); ASSERT_OK(db_->Write(write_options, &batch)); } } - Status ReadValue(int i, std::string* val) const { + Status ReadValue(int i, std::string* val, bool sequential) const { std::string key_space, value_space; - Slice key = Key(i, &key_space); + Slice key = Key(sequential, i, &key_space); Value(i, &value_space); ReadOptions options; return db_->Get(options, key, val); } - Status Verify(int start_idx, int num_vals, - ExpectedVerifResult expected) const { + Status Verify(int start_idx, int num_vals, ExpectedVerifResult expected, + bool seqeuntial = true) const { std::string val; std::string value_space; Status s; for (int i = start_idx; i < start_idx + num_vals && s.ok(); i++) { Value(i, &value_space); - s = ReadValue(i, &val); + s = ReadValue(i, &val, seqeuntial); if (s.ok()) { EXPECT_EQ(value_space, val); } @@ -591,9 +609,16 @@ class FaultInjectionTest : public testing::Test { } // Return the ith key - Slice Key(int i, std::string* storage) const { + Slice Key(bool seqeuntial, int i, std::string* storage) const { + int num = i; + if (!seqeuntial) { + // random transfer + const int m = 0x5bd1e995; + num *= m; + num ^= num << 24; + } char buf[100]; - snprintf(buf, sizeof(buf), "%016d", i); + snprintf(buf, sizeof(buf), "%016d", num); storage->assign(buf, strlen(buf)); return Slice(*storage); } @@ -773,14 +798,14 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) { write_options.sync = false; std::string key_space, value_space; - ASSERT_OK( - db_->Put(write_options, Key(1, &key_space), Value(1, &value_space))); + ASSERT_OK(db_->Put(write_options, Key(true, 1, &key_space), + Value(1, &value_space))); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(db_->Flush(flush_options)); write_options.sync = true; - ASSERT_OK( - db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); + ASSERT_OK(db_->Put(write_options, Key(true, 2, &key_space), + Value(2, &value_space))); env_->SetFilesystemActive(false); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); @@ -789,14 +814,59 @@ TEST_F(FaultInjectionTest, DISABLED_WriteOptionSyncTest) { ASSERT_OK(OpenDB()); std::string val; Value(2, &value_space); - ASSERT_OK(ReadValue(2, &val)); + ASSERT_OK(ReadValue(2, &val, true)); ASSERT_EQ(value_space, val); Value(1, &value_space); - ASSERT_OK(ReadValue(1, &val)); + ASSERT_OK(ReadValue(1, &val, true)); ASSERT_EQ(value_space, val); } +TEST_F(FaultInjectionTest, UninstalledCompaction) { + options_.target_file_size_base = 32 * 1024; + options_.write_buffer_size = 100 << 10; // 100KB + options_.level0_file_num_compaction_trigger = 6; + options_.level0_stop_writes_trigger = 1 << 10; + options_.level0_slowdown_writes_trigger = 1 << 10; + options_.max_background_compactions = 1; + OpenDB(); + + rocksdb::SyncPoint::GetInstance()->LoadDependency({ + {"FaultInjectionTest::FaultTest:0", "DBImpl::BGWorkCompaction"}, + {"CompactionJob::Run():End", "FaultInjectionTest::FaultTest:1"}, + {"FaultInjectionTest::FaultTest:2", + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}, + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + int kNumKeys = 1000; + Build(WriteOptions(), 0, kNumKeys, false); + FlushOptions flush_options; + flush_options.wait = true; + db_->Flush(flush_options); + ASSERT_OK(db_->Put(WriteOptions(), "", "")); + TEST_SYNC_POINT("FaultInjectionTest::FaultTest:0"); + TEST_SYNC_POINT("FaultInjectionTest::FaultTest:1"); + env_->SetFilesystemActive(false); + TEST_SYNC_POINT("FaultInjectionTest::FaultTest:2"); + CloseDB(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + ResetDBState(kResetDropUnsyncedData); + + std::atomic opened(false); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::Open:Opened", [&](void* arg) { opened.store(true); }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BGWorkCompaction", + [&](void* arg) { ASSERT_TRUE(opened.load()); }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(OpenDB()); + static_cast(db_)->TEST_WaitForCompact(); + ASSERT_OK(Verify(0, kNumKeys, FaultInjectionTest::kValExpectFound, false)); + ASSERT_OK(db_->Put(WriteOptions(), "", "")); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} + } // namespace rocksdb int main(int argc, char** argv) {