diff --git a/HISTORY.md b/HISTORY.md index 12250a579..f2f3fd170 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -24,6 +24,9 @@ * Enable backward iteration on keys with user-defined timestamps. * Add statistics and info log for error handler: counters for bg error, bg io error, bg retryable io error, auto resume count, auto resume total retry number, and auto resume sucess; Histogram for auto resume retry count in each recovery call. Note that, each auto resume attempt will have one or multiple retries. +### Behavior Changes +* During flush, only WAL sync retryable IO error is mapped to hard error, which will stall the writes. When WAL is used but only SST file write has retryable IO error, it will be mapped to soft error and write will not be affected. + ## 6.18.0 (02/19/2021) ### Behavior Changes * When retryable IO error occurs during compaction, it is mapped to soft error and set the BG error. However, auto resume is not called to clean the soft error since compaction will reschedule by itself. In this change, When retryable IO error occurs during compaction, BG error is not set. User will be informed the error via EventHelper. diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 8eeafc56b..6cd170fb9 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -82,29 +82,16 @@ TEST_F(DBFlushTest, SyncFail) { options.env = fault_injection_env.get(); SyncPoint::GetInstance()->LoadDependency( - {{"DBFlushTest::SyncFail:GetVersionRefCount:1", - "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"}, - {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", - "DBFlushTest::SyncFail:GetVersionRefCount:2"}, - {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"}, + {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"}, {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}}); SyncPoint::GetInstance()->EnableProcessing(); CreateAndReopenWithCF({"pikachu"}, options); ASSERT_OK(Put("key", "value")); - auto* cfd = - static_cast_with_check(db_->DefaultColumnFamily()) - ->cfd(); FlushOptions flush_options; flush_options.wait = false; ASSERT_OK(dbfull()->Flush(flush_options)); // Flush installs a new super-version. Get the ref count after that. - auto current_before = cfd->current(); - int refs_before = cfd->current()->TEST_refs(); - TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1"); - TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2"); - int refs_after_picking_memtables = cfd->current()->TEST_refs(); - ASSERT_EQ(refs_before + 1, refs_after_picking_memtables); fault_injection_env->SetFilesystemActive(false); TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); @@ -115,9 +102,6 @@ TEST_F(DBFlushTest, SyncFail) { #ifndef ROCKSDB_LITE ASSERT_EQ("", FilesPerLevel()); // flush failed. #endif // ROCKSDB_LITE - // Backgroun flush job should release ref count to current version. - ASSERT_EQ(current_before, cfd->current()); - ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); Destroy(options); } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 279cddd77..dec86c249 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -131,16 +131,11 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { MarkLogsNotSynced(current_log_number - 1); } if (!io_s.ok()) { - if (total_log_size_ > 0) { - error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); - } else { - // If the WAL is empty, we use different error reason - error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL); - } TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); return io_s; } } + TEST_SYNC_POINT("DBImpl::SyncClosedLogs:end"); return io_s; } @@ -170,17 +165,14 @@ Status DBImpl::FlushMemTableToOutputFile( &blob_callback_); FileMetaData file_meta; - TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); - flush_job.PickMemTable(); - TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); - #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); #endif // ROCKSDB_LITE Status s; - IOStatus io_s = IOStatus::OK(); + bool need_cancel = false; + IOStatus log_io_s = IOStatus::OK(); if (logfile_number_ > 0 && versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) { // If there are more than one column families, we need to make sure that @@ -189,16 +181,25 @@ Status DBImpl::FlushMemTableToOutputFile( // flushed SST may contain data from write batches whose updates to // other column families are missing. // SyncClosedLogs() may unlock and re-lock the db_mutex. - io_s = SyncClosedLogs(job_context); - s = io_s; - if (!io_s.ok() && !io_s.IsShutdownInProgress() && - !io_s.IsColumnFamilyDropped()) { - error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + log_io_s = SyncClosedLogs(job_context); + s = log_io_s; + if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && + !log_io_s.IsColumnFamilyDropped()) { + error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); } } else { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); } + // If the log sync failed, we do not need to pick memtable. Otherwise, + // num_flush_not_started_ needs to be rollback. + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); + if (s.ok()) { + flush_job.PickMemTable(); + need_cancel = true; + } + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:AfterPickMemtables"); + // Within flush_job.Run, rocksdb may call event listener to notify // file creation and deletion. // @@ -207,11 +208,16 @@ Status DBImpl::FlushMemTableToOutputFile( // is unlocked by the current thread. if (s.ok()) { s = flush_job.Run(&logs_with_prep_tracker_, &file_meta); - } else { + need_cancel = false; + } + + if (!s.ok() && need_cancel) { flush_job.Cancel(); } - if (io_s.ok()) { - io_s = flush_job.io_status(); + IOStatus io_s = IOStatus::OK(); + io_s = flush_job.io_status(); + if (s.ok()) { + s = io_s; } if (s.ok()) { @@ -247,6 +253,7 @@ Status DBImpl::FlushMemTableToOutputFile( if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { if (!io_s.ok() && !io_s.IsShutdownInProgress() && !io_s.IsColumnFamilyDropped()) { + assert(log_io_s.ok()); // Error while writing to MANIFEST. // In fact, versions_->io_status() can also be the result of renaming // CURRENT file. With current code, it's just difficult to tell. So just @@ -261,15 +268,17 @@ Status DBImpl::FlushMemTableToOutputFile( error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWriteNoWAL); } - } else if (total_log_size_ > 0) { + } else if (total_log_size_ > 0 || !log_io_s.ok()) { error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); } else { // If the WAL is empty, we use different error reason error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL); } } else { - Status new_bg_error = s; - error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + if (log_io_s.ok()) { + Status new_bg_error = s; + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + } } } else { // If we got here, then we decided not to care about the i_os status (either @@ -402,12 +411,11 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( false /* sync_output_directory */, false /* write_manifest */, thread_pri, io_tracer_, db_id_, db_session_id_, cfd->GetFullHistoryTsLow())); - jobs.back()->PickMemTable(); } std::vector file_meta(num_cfs); Status s; - IOStatus io_s; + IOStatus log_io_s = IOStatus::OK(); assert(num_cfs == static_cast(jobs.size())); #ifndef ROCKSDB_LITE @@ -422,18 +430,36 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (logfile_number_ > 0) { // TODO (yanqin) investigate whether we should sync the closed logs for // single column family case. - io_s = SyncClosedLogs(job_context); - s = io_s; + log_io_s = SyncClosedLogs(job_context); + s = log_io_s; + if (!log_io_s.ok() && !log_io_s.IsShutdownInProgress() && + !log_io_s.IsColumnFamilyDropped()) { + if (total_log_size_ > 0) { + error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlush); + } else { + // If the WAL is empty, we use different error reason + error_handler_.SetBGError(log_io_s, BackgroundErrorReason::kFlushNoWAL); + } + } } // exec_status stores the execution status of flush_jobs as // autovector> exec_status; autovector io_status; + std::vector pick_status; for (int i = 0; i != num_cfs; ++i) { // Initially all jobs are not executed, with status OK. exec_status.emplace_back(false, Status::OK()); io_status.emplace_back(IOStatus::OK()); + pick_status.push_back(false); + } + + if (s.ok()) { + for (int i = 0; i != num_cfs; ++i) { + jobs[i]->PickMemTable(); + pick_status[i] = true; + } } if (s.ok()) { @@ -474,6 +500,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = error_status.ok() ? s : error_status; } + IOStatus io_s = IOStatus::OK(); if (io_s.ok()) { IOStatus io_error = IOStatus::OK(); for (int i = 0; i != static_cast(io_status.size()); i++) { @@ -509,7 +536,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // Have to cancel the flush jobs that have NOT executed because we need to // unref the versions. for (int i = 0; i != num_cfs; ++i) { - if (!exec_status[i].first) { + if (pick_status[i] && !exec_status[i].first) { jobs[i]->Cancel(); } } @@ -653,6 +680,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // it is not because of CF drop. if (!s.ok() && !s.IsColumnFamilyDropped()) { if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) { + assert(log_io_s.ok()); // Error while writing to MANIFEST. // In fact, versions_->io_status() can also be the result of renaming // CURRENT file. With current code, it's just difficult to tell. So just @@ -674,8 +702,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL); } } else { - Status new_bg_error = s; - error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + if (log_io_s.ok()) { + Status new_bg_error = s; + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + } } } diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index 9c9940740..ab15fd134 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -351,7 +351,96 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteFileScopeError) { Destroy(options); } -TEST_F(DBErrorHandlingFSTest, FLushWriteNoWALRetryableError1) { +TEST_F(DBErrorHandlingFSTest, FLushWALWriteRetryableError) { + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_env_.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; + Status s; + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + listener->EnableAutoRecovery(false); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncClosedLogs:Start", + [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options); + + WriteOptions wo = WriteOptions(); + wo.disableWAL = false; + ASSERT_OK(Put(Key(1), "val1", wo)); + + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs_->SetFilesystemActive(true); + auto cfh = dbfull()->GetColumnFamilyHandle(1); + s = dbfull()->DropColumnFamily(cfh); + + s = dbfull()->Resume(); + ASSERT_OK(s); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_OK(Put(Key(3), "val3", wo)); + ASSERT_EQ("val3", Get(Key(3))); + s = Flush(); + ASSERT_OK(s); + ASSERT_EQ("val3", Get(Key(3))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWALAtomicWriteRetryableError) { + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_env_.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; + options.atomic_flush = true; + Status s; + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + listener->EnableAutoRecovery(false); + SyncPoint::GetInstance()->SetCallBack( + "DBImpl::SyncClosedLogs:Start", + [&](void*) { fault_fs_->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + + CreateAndReopenWithCF({"pikachu, sdfsdfsdf"}, options); + + WriteOptions wo = WriteOptions(); + wo.disableWAL = false; + ASSERT_OK(Put(Key(1), "val1", wo)); + + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs_->SetFilesystemActive(true); + auto cfh = dbfull()->GetColumnFamilyHandle(1); + s = dbfull()->DropColumnFamily(cfh); + + s = dbfull()->Resume(); + ASSERT_OK(s); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_OK(Put(Key(3), "val3", wo)); + ASSERT_EQ("val3", Get(Key(3))); + s = Flush(); + ASSERT_OK(s); + ASSERT_EQ("val3", Get(Key(3))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritNoWALRetryableError1) { std::shared_ptr listener( new ErrorHandlerFSListener()); Options options = GetDefaultOptions();