diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index ed3a12e9f..5a24226ea 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -741,6 +741,35 @@ TEST_P(DBAtomicFlushTest, CFDropRaceWithWaitForFlushMemTables) { SyncPoint::GetInstance()->DisableProcessing(); } +TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) { + bool atomic_flush = GetParam(); + if (!atomic_flush) { + return; + } + auto fault_injection_env = std::make_shared(env_); + Options options = CurrentOptions(); + options.env = fault_injection_env.get(); + options.create_if_missing = true; + options.atomic_flush = atomic_flush; + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_EQ(2, handles_.size()); + for (size_t cf = 0; cf < handles_.size(); ++cf) { + ASSERT_OK(Put(static_cast(cf), "a", "value")); + } + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0", + [&](void* /*arg*/) { fault_injection_env->SetFilesystemActive(false); }); + SyncPoint::GetInstance()->EnableProcessing(); + FlushOptions flush_opts; + Status s = db_->Flush(flush_opts, handles_); + ASSERT_NOK(s); + fault_injection_env->SetFilesystemActive(true); + Close(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, testing::Bool()); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index cfd07c118..510cfd12b 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -409,7 +409,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = Status::OK(); } - if (s.ok() || s.IsShutdownInProgress() || s.IsColumnFamilyDropped()) { + if (s.ok() || s.IsShutdownInProgress()) { // Sync on all distinct output directories. for (auto dir : distinct_output_dirs) { if (dir != nullptr) { @@ -420,6 +420,23 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( } } } + } else { + // Need to undo atomic flush if something went wrong, i.e. s is not OK and + // it is not because of CF drop. + // Have to cancel the flush jobs that have NOT executed because we need to + // unref the versions. + for (int i = 0; i != num_cfs; ++i) { + if (!exec_status[i].first) { + jobs[i]->Cancel(); + } + } + for (int i = 0; i != num_cfs; ++i) { + if (exec_status[i].first && exec_status[i].second.ok()) { + auto& mems = jobs[i]->GetMemTables(); + cfds[i]->imm()->RollbackMemtableFlush(mems, + file_meta[i].fd.GetNumber()); + } + } } if (s.ok()) { @@ -526,23 +543,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #endif // ROCKSDB_LITE } - // Need to undo atomic flush if something went wrong, i.e. s is not OK and - // it is not because of CF drop. - if (!s.ok() && !s.IsColumnFamilyDropped()) { - // Have to cancel the flush jobs that have NOT executed because we need to - // unref the versions. - for (int i = 0; i != num_cfs; ++i) { - if (!exec_status[i].first) { - jobs[i]->Cancel(); - } - } - for (int i = 0; i != num_cfs; ++i) { - if (exec_status[i].first && exec_status[i].second.ok()) { - auto& mems = jobs[i]->GetMemTables(); - cfds[i]->imm()->RollbackMemtableFlush(mems, - file_meta[i].fd.GetNumber()); - } - } + if (!s.ok() && !s.IsShutdownInProgress()) { Status new_bg_error = s; error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); }