From 147697420ab60c85deb13cea4532844e3f2ae352 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Wed, 14 Nov 2018 20:52:21 -0800 Subject: [PATCH] Rollback memtable flush upon atomic flush fail (#4641) Summary: This fixes an assertion. An atomic flush can have multiple flush jobs. Some of them may fail. If any of them fails, we need to rollback all of them. For the flush jobs that do fail, we already call `RollbackMemTableFlush` in `FlushJob::Run`. The tricky part is for flush jobs that have completed successfully. We need to call `RollbackMemTableFlush` for them as well. The newly added DBAtomicFlushTest.AtomicFlushRollbackSomeJobs will SigAbort without the corresponding change in AtomicFlushMemTablesToOutputFiles. Pull Request resolved: https://github.com/facebook/rocksdb/pull/4641 Differential Revision: D12943649 Pulled By: riversand963 fbshipit-source-id: c66a4a664a1e0938e938fd41edc5a70c34cdd868 --- db/db_flush_test.cc | 46 ++++++++++++++++++++++++++++++++++ db/db_impl_compaction_flush.cc | 26 +++++++++++++++---- 2 files changed, 67 insertions(+), 5 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 5ae4f78ed..e4461b23f 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -355,6 +355,52 @@ TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) { SyncPoint::GetInstance()->DisableProcessing(); } +TEST_P(DBAtomicFlushTest, AtomicFlushRollbackSomeJobs) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(env_)); + Options options = CurrentOptions(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + options.env = fault_injection_env.get(); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1", + "DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"}, + {"DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2", + "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateAndReopenWithCF({"pikachu", "eevee"}, options); + size_t num_cfs = handles_.size(); + ASSERT_EQ(3, num_cfs); + WriteOptions wopts; + wopts.disableWAL = true; + for (size_t i = 0; i != num_cfs; ++i) { + int cf_id = static_cast(i); + ASSERT_OK(Put(cf_id, "key", "value", wopts)); + } + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(dbfull()->Flush(flush_opts, handles_)); + TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:1"); + fault_injection_env->SetFilesystemActive(false); + TEST_SYNC_POINT("DBAtomicFlushTest::AtomicFlushRollbackSomeJobs:2"); + for (auto* cfh : handles_) { + dbfull()->TEST_WaitForFlushMemTable(cfh); + } + for (size_t i = 0; i != num_cfs; ++i) { + auto cfh = static_cast(handles_[i]); + ASSERT_EQ(1, cfh->cfd()->imm()->NumNotFlushed()); + ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty()); + } + fault_injection_env->SetFilesystemActive(true); + Destroy(options); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index cafcbf3dc..1cb9ec323 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -307,7 +307,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( assert(num_cfs == static_cast(jobs.size())); for (int i = 0; i != num_cfs; ++i) { - file_meta.emplace_back(FileMetaData()); + file_meta.emplace_back(); #ifndef ROCKSDB_LITE const MutableCFOptions& mutable_cf_options = @@ -335,7 +335,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (s.ok()) { // TODO (yanqin): parallelize jobs with threads. - for (int i = 0; i != num_cfs; ++i) { + for (int i = 1; i != num_cfs; ++i) { exec_status[i].second = jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); exec_status[i].first = true; @@ -344,6 +344,20 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( break; } } + if (num_cfs > 1) { + TEST_SYNC_POINT( + "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:1"); + TEST_SYNC_POINT( + "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); + } + if (s.ok()) { + exec_status[0].second = + jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); + exec_status[0].first = true; + if (!exec_status[0].second.ok()) { + s = exec_status[0].second; + } + } } if (s.ok()) { @@ -428,9 +442,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } if (!s.IsShutdownInProgress()) { for (int i = 0; i != num_cfs; ++i) { - auto& mems = jobs[i].GetMemTables(); - cfds[i]->imm()->RollbackMemtableFlush(mems, - file_meta[i].fd.GetNumber()); + if (exec_status[i].first && exec_status[i].second.ok()) { + auto& mems = jobs[i].GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush(mems, + file_meta[i].fd.GetNumber()); + } } Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);