From f361cedf0656d55381a0cae2722e934198569f4e Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Fri, 7 Feb 2020 10:50:17 -0800 Subject: [PATCH] Atomic flush rollback once on failure (#6385) Summary: Before this fix, atomic flush codepath may hit an assertion failure on a specific failure case. If all flush jobs within an atomic flush succeed (they do not write to MANIFEST), but batch writing version edits to MANIFEST fails, then `cfd->imm()->RollbackMemTableFlush()` will be called twice, and the second invocation hits assertion failure `assert(m->flush_in_progress_)` since the first invocation resets the variable `flush_in_progress_` to false already. Test plan (dev server): ``` ./db_flush_test --gtest_filter=DBAtomicFlushTest/DBAtomicFlushTest.RollbackAfterFailToInstallResults make check ``` Both must succeed. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6385 Differential Revision: D19782943 Pulled By: riversand963 fbshipit-source-id: 84e1592625e729d1b70fdc8479959387a74cb121 --- db/db_flush_test.cc | 29 ++++++++++++++++++++ db/db_impl/db_impl_compaction_flush.cc | 37 +++++++++++++------------- 2 files changed, 48 insertions(+), 18 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index ed3a12e9f..5a24226ea 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -741,6 +741,35 @@ TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) { SyncPoint::GetInstance()->DisableProcessing(); } +TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + auto fault_injection_env = std::make_shared(env_); + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + for (size_t cf = 0; cf < handles_.size(); ++cf) { + ASSERT_OK(Put(static_cast(cf), "a", "value")); + } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0", + [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); }); + SyncPoint::GetInstance()->EnableProcessing(); + FlushOptions flush_opts; + Status s = db_->Flush(flush_opts, handles_); + ASSERT_NOK(s); + fault_injection_env->SetFilesystemActive(true); + Close(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index cfd07c118..510cfd12b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -409,7 +409,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = Status::OK(); } - if (s.ok() || s.IsShutdownInProgress() || s.IsColumnFamilyDropped()) { + if (s.ok() || s.IsShutdownInProgress()) { // Sync on all distinct output directories. for (auto dir : distinct_output_dirs) { if (dir != nullptr) { @@ -420,6 +420,23 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } } } + } else { + // Need to undo atomic flush if something went wrong, i.e. s is not OK and + // it is not because of CF drop. + // Have to cancel the flush jobs that have NOT executed because we need to + // unref the versions. + for (int i = 0; i != num_cfs; ++i) { + if (!exec_status[i].first) { + jobs[i]->Cancel(); + } + } + for (int i = 0; i != num_cfs; ++i) { + if (exec_status[i].first && exec_status[i].second.ok()) { + auto& mems = jobs[i]->GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush(mems, + file_meta[i].fd.GetNumber()); + } + } } if (s.ok()) { @@ -526,23 +543,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #endif // ROCKSDB_LITE } - // Need to undo atomic flush if something went wrong, i.e. s is not OK and - // it is not because of CF drop. - if (!s.ok() && !s.IsColumnFamilyDropped()) { - // Have to cancel the flush jobs that have NOT executed because we need to - // unref the versions. - for (int i = 0; i != num_cfs; ++i) { - if (!exec_status[i].first) { - jobs[i]->Cancel(); - } - } - for (int i = 0; i != num_cfs; ++i) { - if (exec_status[i].first && exec_status[i].second.ok()) { - auto& mems = jobs[i]->GetMemTables(); - cfds[i]->imm()->RollbackMemtableFlush(mems, - file_meta[i].fd.GetNumber()); - } - } + if (!s.ok() && !s.IsShutdownInProgress()) { Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); }