diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index ab4b1ab4c..3bacb513f 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -9,6 +9,7 @@ #include "db/db_test_util.h" #include "port/stack_trace.h" +#include "util/fault_injection_test_env.h" #include "util/sync_point.h" namespace rocksdb { @@ -47,6 +48,38 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) { #endif // ROCKSDB_LITE } +TEST_F(DBFlushTest, SyncFail) { + std::unique_ptr fault_injection_env( + new FaultInjectionTestEnv(Env::Default())); + Options options; + options.disable_auto_compactions = true; + options.env = fault_injection_env.get(); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"}, + {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + Reopen(options); + Put("key", "value"); + auto* cfd = + reinterpret_cast(db_->DefaultColumnFamily()) + ->cfd(); + int refs_before = cfd->current()->TEST_refs(); + FlushOptions flush_options; + flush_options.wait = false; + ASSERT_OK(dbfull()->Flush(flush_options)); + fault_injection_env->SetFilesystemActive(false); + TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); + TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); + fault_injection_env->SetFilesystemActive(true); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ("", FilesPerLevel()); // flush failed. + // Flush job should release ref count to current version. + ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); + Destroy(options); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index f884701cd..e8e83626b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1842,6 +1842,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, } Status DBImpl::SyncClosedLogs(JobContext* job_context) { + TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); mutex_.AssertHeld(); autovector logs_to_sync; uint64_t current_log_number = logfile_number_; @@ -1878,6 +1879,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { MarkLogsSynced(current_log_number - 1, true, s); if (!s.ok()) { bg_error_ = s; + TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); return s; } } @@ -1928,6 +1930,8 @@ Status DBImpl::FlushMemTableToOutputFile( // is unlocked by the current thread. if (s.ok()) { s = flush_job.Run(&file_meta); + } else { + flush_job.Cancel(); } if (s.ok()) { @@ -2762,7 +2766,7 @@ void DBImpl::MarkLogsSynced( ++it; } } - assert(logs_.empty() || logs_[0].number > up_to || + assert(!status.ok() || logs_.empty() || logs_[0].number > up_to || (logs_.size() == 1 && !logs_[0].getting_synced)); log_sync_cv_.SignalAll(); } diff --git a/db/flush_job.cc b/db/flush_job.cc index 33c295a5f..e957fd7af 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -230,6 +230,12 @@ Status FlushJob::Run(FileMetaData* file_meta) { return s; } +void FlushJob::Cancel() { + db_mutex_->AssertHeld(); + assert(base_ != nullptr); + base_->Unref(); +} + Status FlushJob::WriteLevel0Table() { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_FLUSH_WRITE_L0); diff --git a/db/flush_job.h b/db/flush_job.h index 5a3229cb6..31672dd27 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -67,9 +67,11 @@ class FlushJob { ~FlushJob(); - // Require db_mutex held + // Require db_mutex held. + // Once PickMemTable() is called, either Run() or Cancel() has to be call. void PickMemTable(); Status Run(FileMetaData* file_meta = nullptr); + void Cancel(); TableProperties GetTableProperties() const { return table_properties_; } private: diff --git a/db/version_set.h b/db/version_set.h index 0d7b85e8c..08ed96201 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -520,6 +520,8 @@ class Version { return next_; } + int TEST_refs() const { return refs_; } + VersionStorageInfo* storage_info() { return &storage_info_; } VersionSet* version_set() { return vset_; } diff --git a/util/fault_injection_test_env.cc b/util/fault_injection_test_env.cc index e0a39c5c7..dd41e5b07 100644 --- a/util/fault_injection_test_env.cc +++ b/util/fault_injection_test_env.cc @@ -149,7 +149,7 @@ Status TestWritableFile::Flush() { Status TestWritableFile::Sync() { if (!env_->IsFilesystemActive()) { - return Status::OK(); + return Status::IOError("FaultInjectionTestEnv: not active"); } // No need to actual sync. state_.pos_at_last_sync_ = state_.pos_;