diff --git a/HISTORY.md b/HISTORY.md index bec7a56a7..62e2cdcd2 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -25,6 +25,7 @@ * DB identity (`db_id`) and DB session identity (`db_session_id`) are added to table properties and stored in SST files. SST files generated from SstFileWriter and Repairer have DB identity “SST Writer” and “DB Repairer”, respectively. Their DB session IDs are generated in the same way as `DB::GetDbSessionId`. The session ID for SstFileWriter (resp., Repairer) resets every time `SstFileWriter::Open` (resp., `Repairer::Run`) is called. * Added experimental option BlockBasedTableOptions::optimize_filters_for_memory for reducing allocated memory size of Bloom filters (~10% savings with Jemalloc) while preserving the same general accuracy. To have an effect, the option requires format_version=5 and malloc_usable_size. Enabling this option is forward and backward compatible with existing format_version=5. * `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is added, where `BackupTableNameOption` is an `enum` type with two enumerators `kChecksumAndFileSize` and `kChecksumAndFileSize`. By default, `BackupTableNameOption BackupableDBOptions::share_files_with_checksum_naming` is set to `kChecksumAndDbSessionId`. In this default case, backup table filenames are of the form `__.sst` as opposed to `__.sst`. The new default behavior fixes the backup file name collision problem, which might be possible at large scale, but the option `kChecksumAndFileSize` is added to allow use of old naming in case it is needed. This default behavior change is not an upgrade issue, because previous versions of RocksDB can read, restore, and delete backups using new names, and it's OK for a backup directory to use a mixture of table file naming schemes. Note that `share_files_with_checksum_naming` comes into effect only when both `share_files_with_checksum` and `share_table_files` are true. +* Added auto resume function to automatically recover the DB from background Retryable IO Error. When retryable IOError happens during flush and WAL write, the error is mapped to Hard Error and DB will be in read mode. When retryable IO Error happens during compaction, the error will be mapped to Soft Error. DB is still in write/read mode. Autoresume function will create a thread for a DB to call DB->ResumeImpl() to try the recover for Retryable IO Error during flush and WAL write. Compaction will be rescheduled by itself if retryable IO Error happens. Auto resume may also cause other Retryable IO Error during the recovery, so the recovery will fail. Retry the auto resume may solve the issue, so we use max_bgerror_resume_count to decide how many resume cycles will be tried in total. If it is <=0, auto resume retryable IO Error is disabled. Default is INT_MAX, which will lead to a infinit auto resume. bgerror_resume_retry_interval decides the time interval between two auto resumes. ### Bug Fixes * Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. diff --git a/db/builder.cc b/db/builder.cc index 6c89562e0..eb3ab5422 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -94,6 +94,7 @@ Status BuildTable( // Reports the IOStats for flush for every following bytes. const size_t kReportFlushIOStatsEvery = 1048576; Status s; + IOStatus io_s; meta->fd.file_size = 0; iter->SeekToFirst(); std::unique_ptr range_del_agg( @@ -124,7 +125,11 @@ Status BuildTable( bool use_direct_writes = file_options.use_direct_writes; TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes); #endif // !NDEBUG - s = NewWritableFile(fs, fname, &file, file_options); + io_s = NewWritableFile(fs, fname, &file, file_options); + s = io_s; + if (io_status->ok()) { + *io_status = io_s; + } if (!s.ok()) { EventHelpers::LogAndNotifyTableFileCreationFinished( event_logger, ioptions.listeners, dbname, column_family_name, fname, @@ -193,7 +198,10 @@ Status BuildTable( } else { s = builder->Finish(); } - *io_status = builder->io_status(); + io_s = builder->io_status(); + if (io_status->ok()) { + *io_status = io_s; + } if (s.ok() && !empty) { uint64_t file_size = builder->FileSize(); @@ -212,16 +220,16 @@ Status BuildTable( StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS); *io_status = file_writer->Sync(ioptions.use_fsync); } - if (io_status->ok() && !empty) { + if (s.ok() && io_status->ok() && !empty) { *io_status = file_writer->Close(); } - if (io_status->ok() && !empty) { + if (s.ok() && io_status->ok() && !empty) { // Add the checksum information to file metadata. meta->file_checksum = file_writer->GetFileChecksum(); meta->file_checksum_func_name = file_writer->GetFileChecksumFuncName(); } - if (!io_status->ok()) { + if (s.ok()) { s = *io_status; } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 249fb93d8..9eaa76f08 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -116,6 +116,9 @@ struct CompactionJob::SubcompactionState { // The return status of this subcompaction Status status; + // The return IO Status of this subcompaction + IOStatus io_status; + // Files produced by this subcompaction struct Output { FileMetaData meta; @@ -179,6 +182,7 @@ struct CompactionJob::SubcompactionState { start = std::move(o.start); end = std::move(o.end); status = std::move(o.status); + io_status = std::move(o.io_status); outputs = std::move(o.outputs); outfile = std::move(o.outfile); builder = std::move(o.builder); @@ -609,22 +613,26 @@ Status CompactionJob::Run() { // Check if any thread encountered an error during execution Status status; + IOStatus io_s; for (const auto& state : compact_->sub_compact_states) { if (!state.status.ok()) { status = state.status; + io_s = state.io_status; break; } } - - IOStatus io_s; + if (io_status_.ok()) { + io_status_ = io_s; + } if (status.ok() && output_directory_) { io_s = output_directory_->Fsync(IOOptions(), nullptr); } - if (!io_s.ok()) { + if (io_status_.ok()) { io_status_ = io_s; + } + if (status.ok()) { status = io_s; } - if (status.ok()) { thread_pool.clear(); std::vector files_meta; @@ -1210,6 +1218,7 @@ Status CompactionJob::FinishCompactionOutputFile( } else { it->SeekToFirst(); } + TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); for (; it->Valid(); it->Next()) { auto tombstone = it->Tombstone(); if (upper_bound != nullptr) { @@ -1310,9 +1319,9 @@ Status CompactionJob::FinishCompactionOutputFile( } else { sub_compact->builder->Abandon(); } - if (!sub_compact->builder->io_status().ok()) { - io_status_ = sub_compact->builder->io_status(); - s = io_status_; + IOStatus io_s = sub_compact->builder->io_status(); + if (s.ok()) { + s = io_s; } const uint64_t current_bytes = sub_compact->builder->FileSize(); if (s.ok()) { @@ -1322,24 +1331,25 @@ Status CompactionJob::FinishCompactionOutputFile( sub_compact->total_bytes += current_bytes; // Finish and check for file errors - IOStatus io_s; if (s.ok()) { StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS); io_s = sub_compact->outfile->Sync(db_options_.use_fsync); } - if (io_s.ok()) { + if (s.ok() && io_s.ok()) { io_s = sub_compact->outfile->Close(); } - if (io_s.ok()) { + if (s.ok() && io_s.ok()) { // Add the checksum information to file metadata. meta->file_checksum = sub_compact->outfile->GetFileChecksum(); meta->file_checksum_func_name = sub_compact->outfile->GetFileChecksumFuncName(); } - if (!io_s.ok()) { - io_status_ = io_s; + if (s.ok()) { s = io_s; } + if (sub_compact->io_status.ok()) { + sub_compact->io_status = io_s; + } sub_compact->outfile.reset(); TableProperties tp; @@ -1488,7 +1498,12 @@ Status CompactionJob::OpenCompactionOutputFile( TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", &syncpoint_arg); #endif - Status s = NewWritableFile(fs_, fname, &writable_file, file_options_); + Status s; + IOStatus io_s = NewWritableFile(fs_, fname, &writable_file, file_options_); + s = io_s; + if (sub_compact->io_status.ok()) { + sub_compact->io_status = io_s; + } if (!s.ok()) { ROCKS_LOG_ERROR( db_options_.info_log, diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 51f513832..a6b42f7b4 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1710,8 +1710,8 @@ class DBImpl : public DB { size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } - Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, - size_t preallocate_block_size, log::Writer** new_log); + IOStatus CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, + size_t preallocate_block_size, log::Writer** new_log); // Validate self-consistency of DB options static Status ValidateOptions(const DBOptions& db_options); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index cfb97be8a..f70600fb5 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -194,7 +194,9 @@ Status DBImpl::FlushMemTableToOutputFile( } else { flush_job.Cancel(); } - io_s = flush_job.io_status(); + if (io_s.ok()) { + io_s = flush_job.io_status(); + } if (s.ok()) { InstallSuperVersionAndScheduleWork(cfd, superversion_context, @@ -1107,7 +1109,12 @@ Status DBImpl::CompactFilesImpl( "[%s] [JOB %d] Compaction error: %s", c->column_family_data()->GetName().c_str(), job_context->job_id, status.ToString().c_str()); - error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + IOStatus io_s = compaction_job.io_status(); + if (!io_s.ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction); + } else { + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + } } if (output_file_names != nullptr) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index e34d4f3e3..058d54246 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1388,9 +1388,10 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, !kSeqPerBatch, kBatchPerTxn); } -Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, - size_t preallocate_block_size, log::Writer** new_log) { - Status s; +IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, + size_t preallocate_block_size, + log::Writer** new_log) { + IOStatus io_s; std::unique_ptr lfile; DBOptions db_options = @@ -1408,13 +1409,13 @@ Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, LogFileName(immutable_db_options_.wal_dir, recycle_log_number); TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1"); TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2"); - s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options, - &lfile, /*dbg=*/nullptr); + io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options, + &lfile, /*dbg=*/nullptr); } else { - s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options); + io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options); } - if (s.ok()) { + if (io_s.ok()) { lfile->SetWriteLifeTimeHint(CalculateWALWriteHint()); lfile->SetPreallocationBlockSize(preallocate_block_size); @@ -1426,7 +1427,7 @@ Status DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush); } - return s; + return io_s; } Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index b3861c32f..9933d26a0 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1638,6 +1638,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { std::unique_ptr lfile; log::Writer* new_log = nullptr; MemTable* new_mem = nullptr; + IOStatus io_s; // Recoverable state is persisted in WAL. After memtable switch, WAL might // be deleted, so we write the state to memtable to be persisted as well. @@ -1683,8 +1684,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (creating_new_log) { // TODO: Write buffer size passed in should be max of all CF's instead // of mutable_cf_options.write_buffer_size. - s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size, - &new_log); + io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size, + &new_log); + if (s.ok()) { + s = io_s; + } } if (s.ok()) { SequenceNumber seq = versions_->LastSequence(); @@ -1710,7 +1714,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one log::Writer* cur_log_writer = logs_.back().writer; - s = cur_log_writer->WriteBuffer(); + io_s = cur_log_writer->WriteBuffer(); + if (s.ok()) { + s = io_s; + } if (!s.ok()) { ROCKS_LOG_WARN(immutable_db_options_.info_log, "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64 @@ -1745,7 +1752,11 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } // We may have lost data from the WritableFileBuffer in-memory buffer for // the current log, so treat it as a fatal error and set bg_error - error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); + if (!io_s.ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable); + } else { + error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); + } // Read back bg_error in order to get the right severity s = error_handler_.GetBGError(); return s; diff --git a/db/error_handler.cc b/db/error_handler.cc index 1f7bbd7ec..e344e9931 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -154,6 +154,10 @@ void ErrorHandler::CancelErrorRecovery() { recovery_in_prog_ = false; } } + + // If auto recovery is also runing to resume from the retryable error, + // we should wait and end the auto recovery. + EndAutoRecovery(); #endif } @@ -177,7 +181,6 @@ void ErrorHandler::CancelErrorRecovery() { // end whether recovery succeeded or not Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) { db_mutex_->AssertHeld(); - if (bg_err.ok()) { return Status::OK(); } @@ -260,8 +263,11 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, if (bg_io_err.ok()) { return Status::OK(); } - if (recovery_in_prog_ && recovery_error_.ok()) { - recovery_error_ = bg_io_err; + ROCKS_LOG_WARN(db_options_.info_log, "Background IO error %s", + bg_io_err.ToString().c_str()); + + if (recovery_in_prog_ && recovery_io_error_.ok()) { + recovery_io_error_ = bg_io_err; } if (BackgroundErrorReason::kManifestWrite == reason) { // Always returns ok @@ -275,6 +281,9 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, bool auto_recovery = false; Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError); bg_error_ = bg_err; + if (recovery_in_prog_ && recovery_error_.ok()) { + recovery_error_ = bg_err; + } EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_, &auto_recovery); return bg_error_; @@ -282,16 +291,28 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, // Second, check if the error is a retryable IO error or not. if it is // retryable error and its severity is higher than bg_error_, overwrite // the bg_error_ with new error. - // In current stage, treat retryable error as HardError. No automatic - // recovery. + // In current stage, for retryable IO error of compaction, treat it as + // soft error. In other cases, treat the retryable IO error as hard + // error. bool auto_recovery = false; - Status bg_err(new_bg_io_err, Status::Severity::kHardError); EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_, &auto_recovery); - if (bg_err.severity() > bg_error_.severity()) { - bg_error_ = bg_err; + if (BackgroundErrorReason::kCompaction == reason) { + Status bg_err(new_bg_io_err, Status::Severity::kSoftError); + if (bg_err.severity() > bg_error_.severity()) { + bg_error_ = bg_err; + } + return bg_error_; + } else { + Status bg_err(new_bg_io_err, Status::Severity::kHardError); + if (recovery_in_prog_ && recovery_error_.ok()) { + recovery_error_ = bg_err; + } + if (bg_err.severity() > bg_error_.severity()) { + bg_error_ = bg_err; + } + return StartRecoverFromRetryableBGIOError(bg_io_err); } - return bg_error_; } else { s = SetBGError(new_bg_io_err, reason); } @@ -401,4 +422,122 @@ Status ErrorHandler::RecoverFromBGError(bool is_manual) { return bg_error_; #endif } + +Status ErrorHandler::StartRecoverFromRetryableBGIOError(IOStatus io_error) { +#ifndef ROCKSDB_LITE + db_mutex_->AssertHeld(); + if (bg_error_.ok() || io_error.ok()) { + return Status::OK(); + } + if (db_options_.max_bgerror_resume_count <= 0 || recovery_in_prog_ || + recovery_thread_) { + // Auto resume BG error is not enabled, directly return bg_error_. + return bg_error_; + } + + recovery_in_prog_ = true; + recovery_thread_.reset( + new port::Thread(&ErrorHandler::RecoverFromRetryableBGIOError, this)); + + if (recovery_io_error_.ok() && recovery_error_.ok()) { + return Status::OK(); + } else { + TEST_SYNC_POINT("StartRecoverRetryableBGIOError:RecoverFail"); + return bg_error_; + } +#else + (void)io_error; + return bg_error_; +#endif +} + +// Automatic recover from Retryable BG IO error. Must be called after db +// mutex is released. +void ErrorHandler::RecoverFromRetryableBGIOError() { +#ifndef ROCKSDB_LITE + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart"); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart1"); + InstrumentedMutexLock l(db_mutex_); + if (end_recovery_) { + return; + } + int resume_count = db_options_.max_bgerror_resume_count; + uint64_t wait_interval = db_options_.bgerror_resume_retry_interval; + // Recover from the retryable error. Create a separate thread to do it. + while (resume_count > 0) { + if (end_recovery_) { + return; + } + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeResume0"); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeResume1"); + recovery_io_error_ = IOStatus::OK(); + recovery_error_ = Status::OK(); + Status s = db_->ResumeImpl(); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterResume0"); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterResume1"); + if (s.IsShutdownInProgress() || + bg_error_.severity() >= Status::Severity::kFatalError) { + // If DB shutdown in progress or the error severity is higher than + // Hard Error, stop auto resume and returns. + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverFail0"); + recovery_in_prog_ = false; + return; + } + if (!recovery_io_error_.ok() && + recovery_error_.severity() <= Status::Severity::kHardError && + recovery_io_error_.GetRetryable()) { + // If new BG IO error happens during auto recovery and it is retryable + // and its severity is Hard Error or lower, the auto resmue sleep for + // a period of time and redo auto resume if it is allowed. + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeWait0"); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeWait1"); + int64_t wait_until = db_->env_->NowMicros() + wait_interval; + cv_.TimedWait(wait_until); + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterWait0"); + } else { + // There are three possibility: 1) recover_io_error is set during resume + // and the error is not retryable, 2) recover is successful, 3) other + // error happens during resume and cannot be resumed here. + if (recovery_io_error_.ok() && recovery_error_.ok() && s.ok()) { + // recover from the retryable IO error and no other BG errors. Clean + // the bg_error and notify user. + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverSuccess"); + Status old_bg_error = bg_error_; + bg_error_ = Status::OK(); + EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners, + old_bg_error, db_mutex_); + recovery_in_prog_ = false; + return; + } else { + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:RecoverFail1"); + // In this case: 1) recovery_io_error is more serious or not retryable + // 2) other Non IO recovery_error happens. The auto recovery stops. + recovery_in_prog_ = false; + return; + } + } + resume_count--; + } + recovery_in_prog_ = false; + TEST_SYNC_POINT("RecoverFromRetryableBGIOError:LoopOut"); + return; +#else + return; +#endif +} + +void ErrorHandler::EndAutoRecovery() { + db_mutex_->AssertHeld(); + if (!end_recovery_) { + end_recovery_ = true; + } + cv_.SignalAll(); + db_mutex_->Unlock(); + if (recovery_thread_) { + recovery_thread_->join(); + } + db_mutex_->Lock(); + return; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/error_handler.h b/db/error_handler.h index 535ed675f..a2f1a7ec0 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -22,6 +22,10 @@ class ErrorHandler { db_options_(db_options), bg_error_(Status::OK()), recovery_error_(Status::OK()), + recovery_io_error_(IOStatus::OK()), + cv_(db_mutex), + end_recovery_(false), + recovery_thread_(nullptr), db_mutex_(db_mutex), auto_recovery_(false), recovery_in_prog_(false) {} @@ -59,6 +63,8 @@ class ErrorHandler { Status RecoverFromBGError(bool is_manual = false); void CancelErrorRecovery(); + void EndAutoRecovery(); + private: DBImpl* db_; const ImmutableDBOptions& db_options_; @@ -66,6 +72,15 @@ class ErrorHandler { // A separate Status variable used to record any errors during the // recovery process from hard errors Status recovery_error_; + // A separate IO Status variable used to record any IO errors during + // the recovery process. At the same time, recovery_error_ is also set. + IOStatus recovery_io_error_; + // The condition variable used with db_mutex during auto resume for time + // wait. + InstrumentedCondVar cv_; + bool end_recovery_; + std::unique_ptr recovery_thread_; + InstrumentedMutex* db_mutex_; // A flag indicating whether automatic recovery from errors is enabled bool auto_recovery_; @@ -73,6 +88,8 @@ class ErrorHandler { Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery); void RecoverFromNoSpace(); + Status StartRecoverFromRetryableBGIOError(IOStatus io_error); + void RecoverFromRetryableBGIOError(); }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index a944c6721..cf1d7189a 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -192,6 +192,7 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) { options.env = fault_fs_env.get(); options.create_if_missing = true; options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; Status s; listener->EnableAutoRecovery(false); @@ -298,6 +299,7 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) { options.env = fault_fs_env.get(); options.create_if_missing = true; options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; Status s; std::string old_manifest; std::string new_manifest; @@ -467,6 +469,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) { options.create_if_missing = true; options.level0_file_num_compaction_trigger = 2; options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; Status s; std::string old_manifest; std::string new_manifest; @@ -585,6 +588,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) { options.create_if_missing = true; options.level0_file_num_compaction_trigger = 2; options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 0; Status s; DestroyAndReopen(options); @@ -602,7 +606,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) { {{"DBImpl::FlushMemTable:FlushMemTableFinished", "BackgroundCallCompaction:0"}}); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( - "BackgroundCallCompaction:0", + "CompactionJob::OpenCompactionOutputFile", [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); @@ -611,7 +615,7 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) { ASSERT_EQ(s, Status::OK()); s = dbfull()->TEST_WaitForCompact(); - ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); fault_fs->SetFilesystemActive(true); SyncPoint::GetInstance()->ClearAllCallBacks(); @@ -808,6 +812,7 @@ TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) { options.writable_file_max_buffer_size = 32768; options.listeners.emplace_back(listener); options.paranoid_checks = true; + options.max_bgerror_resume_count = 0; Status s; Random rnd(301); @@ -1206,6 +1211,765 @@ TEST_F(DBErrorHandlingFSTest, MultiDBVariousErrors) { options.clear(); delete def_env; } + +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover1) { + // Fail the first resume and make the second resume successful + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(1), "val1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeWait0", + "FLushWritRetryableeErrorAutoRecover1:0"}, + {"FLushWritRetryableeErrorAutoRecover1:1", + "RecoverFromRetryableBGIOError:BeforeWait1"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "FLushWritRetryableeErrorAutoRecover1:2"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:0"); + fault_fs->SetFilesystemActive(true); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:1"); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover1:2"); + SyncPoint::GetInstance()->DisableProcessing(); + + ASSERT_EQ("val1", Get(Key(1))); + Reopen(options); + ASSERT_EQ("val1", Get(Key(1))); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val2", Get(Key(2))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover2) { + // Activate the FS before the first resume + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(1), "val1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeStart", + "FLushWritRetryableeErrorAutoRecover2:0"}, + {"FLushWritRetryableeErrorAutoRecover2:1", + "RecoverFromRetryableBGIOError:BeforeStart1"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "FLushWritRetryableeErrorAutoRecover2:2"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:0"); + fault_fs->SetFilesystemActive(true); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:1"); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover2:2"); + SyncPoint::GetInstance()->DisableProcessing(); + + ASSERT_EQ("val1", Get(Key(1))); + Reopen(options); + ASSERT_EQ("val1", Get(Key(1))); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val2", Get(Key(2))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover3) { + // Fail all the resume and let user to resume + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(1), "val1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"FLushWritRetryableeErrorAutoRecover3:0", + "RecoverFromRetryableBGIOError:BeforeStart"}, + {"RecoverFromRetryableBGIOError:LoopOut", + "FLushWritRetryableeErrorAutoRecover3:1"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:0"); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover3:1"); + fault_fs->SetFilesystemActive(true); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + + ASSERT_EQ("val1", Get(Key(1))); + // Auto resume fails due to FS does not recover during resume. User call + // resume manually here. + s = dbfull()->Resume(); + ASSERT_EQ("val1", Get(Key(1))); + ASSERT_EQ(s, Status::OK()); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val2", Get(Key(2))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover4) { + // Fail the first resume and does not do resume second time because + // the IO error severity is Fatal Error and not Retryable. + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 10; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + IOStatus nr_msg = IOStatus::IOError("No Retryable Fatal IO Error"); + nr_msg.SetRetryable(false); + + Put(Key(1), "val1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeStart", + "FLushWritRetryableeErrorAutoRecover4:0"}, + {"FLushWritRetryableeErrorAutoRecover4:2", + "RecoverFromRetryableBGIOError:RecoverFail0"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->SetCallBack( + "RecoverFromRetryableBGIOError:BeforeResume1", + [&](void*) { fault_fs->SetFilesystemActive(false, nr_msg); }); + + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:0"); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover4:2"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + // Even the FS is recoverd, due to the Fatal Error in bg_error_ the resume + // and flush will all fail. + ASSERT_EQ("val1", Get(Key(1))); + s = dbfull()->Resume(); + ASSERT_NE(s, Status::OK()); + ASSERT_EQ("val1", Get(Key(1))); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_NE(s, Status::OK()); + ASSERT_EQ("NOT_FOUND", Get(Key(2))); + + Reopen(options); + ASSERT_EQ("val1", Get(Key(1))); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val2", Get(Key(2))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover5) { + // During the resume, call DB->CLose, make sure the resume thread exist + // before close continues. Due to the shutdown, the resume is not successful + // and the FS does not become active, so close status is still IO error + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 10; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + IOStatus nr_msg = IOStatus::IOError("No Retryable Fatal IO Error"); + nr_msg.SetRetryable(false); + + Put(Key(1), "val1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeStart", + "FLushWritRetryableeErrorAutoRecover5:0"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover5:0"); + // The first resume will cause recovery_error and its severity is the + // Fatal error + s = dbfull()->Close(); + ASSERT_NE(s, Status::OK()); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + + Reopen(options); + ASSERT_NE("val1", Get(Key(1))); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val2", Get(Key(2))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeErrorAutoRecover6) { + // During the resume, call DB->CLose, make sure the resume thread exist + // before close continues. Due to the shutdown, the resume is not successful + // and the FS does not become active, so close status is still IO error + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 10; // 0.1 second + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + IOStatus nr_msg = IOStatus::IOError("No Retryable Fatal IO Error"); + nr_msg.SetRetryable(false); + + Put(Key(1), "val1"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"FLushWritRetryableeErrorAutoRecover6:0", + "RecoverFromRetryableBGIOError:BeforeStart"}, + {"RecoverFromRetryableBGIOError:BeforeWait0", + "FLushWritRetryableeErrorAutoRecover6:1"}, + {"FLushWritRetryableeErrorAutoRecover6:2", + "RecoverFromRetryableBGIOError:BeforeWait1"}, + {"RecoverFromRetryableBGIOError:AfterWait0", + "FLushWritRetryableeErrorAutoRecover6:3"}}); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:0"); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:1"); + fault_fs->SetFilesystemActive(true); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:2"); + TEST_SYNC_POINT("FLushWritRetryableeErrorAutoRecover6:3"); + // The first resume will cause recovery_error and its severity is the + // Fatal error + s = dbfull()->Close(); + ASSERT_EQ(s, Status::OK()); + SyncPoint::GetInstance()->DisableProcessing(); + + Reopen(options); + ASSERT_EQ("val1", Get(Key(1))); + Put(Key(2), "val2"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ("val2", Get(Key(2))); + + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableErrorAutoRecover) { + // Fail the first resume and let the second resume be successful + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + std::string old_manifest; + std::string new_manifest; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + old_manifest = GetManifestNameFromLiveFiles(); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(0), "val"); + Flush(); + Put(Key(1), "val"); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeStart", + "ManifestWriteRetryableErrorAutoRecover:0"}, + {"ManifestWriteRetryableErrorAutoRecover:1", + "RecoverFromRetryableBGIOError:BeforeWait1"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "ManifestWriteRetryableErrorAutoRecover:2"}}); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:0"); + fault_fs->SetFilesystemActive(true); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:1"); + TEST_SYNC_POINT("ManifestWriteRetryableErrorAutoRecover:2"); + SyncPoint::GetInstance()->DisableProcessing(); + + new_manifest = GetManifestNameFromLiveFiles(); + ASSERT_NE(new_manifest, old_manifest); + + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); + ASSERT_EQ("val", Get(Key(1))); + Close(); +} + +TEST_F(DBErrorHandlingFSTest, + CompactionManifestWriteRetryableErrorAutoRecover) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + std::string old_manifest; + std::string new_manifest; + std::atomic fail_manifest(false); + DestroyAndReopen(options); + old_manifest = GetManifestNameFromLiveFiles(); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(0), "val"); + Put(Key(2), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError)); + listener->EnableAutoRecovery(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + // Wait for flush of 2nd L0 file before starting compaction + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}, + // Wait for compaction to detect manifest write error + {"BackgroundCallCompaction:1", "CompactionManifestWriteErrorAR:0"}, + // Make compaction thread wait for error to be cleared + {"CompactionManifestWriteErrorAR:1", + "DBImpl::BackgroundCallCompaction:FoundObsoleteFiles"}, + {"CompactionManifestWriteErrorAR:2", + "RecoverFromRetryableBGIOError:BeforeStart"}, + // Fail the first resume, before the wait in resume + {"RecoverFromRetryableBGIOError:BeforeResume0", + "CompactionManifestWriteErrorAR:3"}, + // Activate the FS before the second resume + {"CompactionManifestWriteErrorAR:4", + "RecoverFromRetryableBGIOError:BeforeResume1"}, + // Wait the auto resume be sucessful + {"RecoverFromRetryableBGIOError:RecoverSuccess", + "CompactionManifestWriteErrorAR:5"}}); + // trigger manifest write failure in compaction thread + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { fail_manifest.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", [&](void*) { + if (fail_manifest.load()) { + fault_fs->SetFilesystemActive(false, error_msg); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + TEST_SYNC_POINT("CompactionManifestWriteErrorAR:0"); + TEST_SYNC_POINT("CompactionManifestWriteErrorAR:1"); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + TEST_SYNC_POINT("CompactionManifestWriteErrorAR:2"); + TEST_SYNC_POINT("CompactionManifestWriteErrorAR:3"); + fault_fs->SetFilesystemActive(true); + SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("CompactionManifestWriteErrorAR:4"); + TEST_SYNC_POINT("CompactionManifestWriteErrorAR:5"); + SyncPoint::GetInstance()->DisableProcessing(); + + new_manifest = GetManifestNameFromLiveFiles(); + ASSERT_NE(new_manifest, old_manifest); + + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); + ASSERT_EQ("val", Get(Key(1))); + ASSERT_EQ("val", Get(Key(2))); + Close(); +} + +TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableErrorAutoRecover) { + // In this test, in the first round of compaction, the FS is set to error. + // So the first compaction fails due to retryable IO error and it is mapped + // to soft error. Then, compaction is rescheduled, in the second round of + // compaction, the FS is set to active and compaction is successful, so + // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync + // point. + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); + Status s; + std::atomic fail_first(false); + std::atomic fail_second(true); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError)); + listener->EnableAutoRecovery(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}, + {"CompactionJob::FinishCompactionOutputFile1", + "CompactionWriteRetryableErrorAutoRecover0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:Start", + [&](void*) { fault_fs->SetFilesystemActive(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::OpenCompactionOutputFile", [&](void*) { + if (fail_first.load() && fail_second.load()) { + fault_fs->SetFilesystemActive(false, error_msg); + fail_second.store(false); + } + }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError); + + TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0"); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + Destroy(options); +} + +TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover1) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.listeners.emplace_back(listener); + options.paranoid_checks = true; + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + Random rnd(301); + + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + // For the first batch, write is successful, require sync + { + WriteBatch batch; + + for (auto i = 0; i < 100; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + // For the second batch, the first 2 file Append are successful, then the + // following Append fails due to file system retryable IOError. + { + WriteBatch batch; + int write_error = 0; + + for (auto i = 100; i < 200; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeResume0", "WALWriteError1:0"}, + {"WALWriteError1:1", "RecoverFromRetryableBGIOError:BeforeResume1"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError1:2"}}); + + SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_fs->SetFilesystemActive(false, error_msg); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(true, s.IsIOError()); + + TEST_SYNC_POINT("WALWriteError1:0"); + fault_fs->SetFilesystemActive(true); + SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("WALWriteError1:1"); + TEST_SYNC_POINT("WALWriteError1:2"); + } + SyncPoint::GetInstance()->DisableProcessing(); + + // Data in corrupted WAL are not stored + for (auto i = 0; i < 199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + + // Resume and write a new batch, should be in the WAL + { + WriteBatch batch; + + for (auto i = 200; i < 300; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + Reopen(options); + for (auto i = 0; i < 300; ++i) { + if (i < 100 || i >= 200) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Close(); +} + +TEST_F(DBErrorHandlingFSTest, WALWriteRetryableErrorAutoRecover2) { + // Fail the first recover and try second time. + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.listeners.emplace_back(listener); + options.paranoid_checks = true; + options.max_bgerror_resume_count = 2; + options.bgerror_resume_retry_interval = 100000; // 0.1 second + Status s; + Random rnd(301); + + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + // For the first batch, write is successful, require sync + { + WriteBatch batch; + + for (auto i = 0; i < 100; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + // For the second batch, the first 2 file Append are successful, then the + // following Append fails due to file system retryable IOError. + { + WriteBatch batch; + int write_error = 0; + + for (auto i = 100; i < 200; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"RecoverFromRetryableBGIOError:BeforeWait0", "WALWriteError2:0"}, + {"WALWriteError2:1", "RecoverFromRetryableBGIOError:BeforeWait1"}, + {"RecoverFromRetryableBGIOError:RecoverSuccess", "WALWriteError2:2"}}); + + SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_fs->SetFilesystemActive(false, error_msg); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(true, s.IsIOError()); + + TEST_SYNC_POINT("WALWriteError2:0"); + fault_fs->SetFilesystemActive(true); + SyncPoint::GetInstance()->ClearAllCallBacks(); + TEST_SYNC_POINT("WALWriteError2:1"); + TEST_SYNC_POINT("WALWriteError2:2"); + } + SyncPoint::GetInstance()->DisableProcessing(); + + // Data in corrupted WAL are not stored + for (auto i = 0; i < 199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + + // Resume and write a new batch, should be in the WAL + { + WriteBatch batch; + + for (auto i = 200; i < 300; ++i) { + batch.Put(Key(i), rnd.RandomString(1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + Reopen(options); + for (auto i = 0; i < 300; ++i) { + if (i < 100 || i >= 200) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Close(); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f07a9a702..74e2e62ff 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1137,6 +1137,23 @@ struct DBOptions { // not be used for recovery if best_efforts_recovery is true. // Default: false bool best_efforts_recovery = false; + + // It defines how many times db resume is called by a separate thread when + // background retryable IO Error happens. When background retryable IO + // Error happens, SetBGError is called to deal with the error. If the error + // can be auto-recovered (e.g., retryable IO Error during Flush or WAL write), + // then db resume is called in background to recover from the error. If this + // value is 0 or negative, db resume will not be called. + // + // Default: INT_MAX + int max_bgerror_resume_count = INT_MAX; + + // If max_bgerror_resume_count is >= 2, db resume is called multiple times. + // This option decides how long to wait to retry the next resume if the + // previous resume fails and satisfy redo resume conditions. + // + // Default: 1000000 (microseconds). + uint64_t bgerror_resume_retry_interval = 1000000; }; // Options to control the behavior of a database (passed to DB::Open) diff --git a/options/db_options.cc b/options/db_options.cc index 2fad9ef17..635767826 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -354,6 +354,14 @@ std::unordered_map {offsetof(struct DBOptions, best_efforts_recovery), OptionType::kBoolean, OptionVerificationType::kNormal, OptionTypeFlags::kNone, 0}}, + {"max_bgerror_resume_count", + {offsetof(struct DBOptions, max_bgerror_resume_count), + OptionType::kInt, OptionVerificationType::kNormal, + OptionTypeFlags::kNone, 0}}, + {"bgerror_resume_retry_interval", + {offsetof(struct DBOptions, bgerror_resume_retry_interval), + OptionType::kUInt64T, OptionVerificationType::kNormal, + OptionTypeFlags::kNone, 0}}, // The following properties were handled as special cases in ParseOption // This means that the properties could be read from the options file // but never written to the file or compared to each other. @@ -465,7 +473,9 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) write_dbid_to_manifest(options.write_dbid_to_manifest), log_readahead_size(options.log_readahead_size), file_checksum_gen_factory(options.file_checksum_gen_factory), - best_efforts_recovery(options.best_efforts_recovery) { + best_efforts_recovery(options.best_efforts_recovery), + max_bgerror_resume_count(options.max_bgerror_resume_count), + bgerror_resume_retry_interval(options.bgerror_resume_retry_interval) { } void ImmutableDBOptions::Dump(Logger* log) const { @@ -619,6 +629,11 @@ void ImmutableDBOptions::Dump(Logger* log) const { : kUnknownFileChecksumFuncName); ROCKS_LOG_HEADER(log, " Options.best_efforts_recovery: %d", static_cast(best_efforts_recovery)); + ROCKS_LOG_HEADER(log, " Options.max_bgerror_resume_count: %d", + max_bgerror_resume_count); + ROCKS_LOG_HEADER(log, + " Options.bgerror_resume_retry_interval: %" PRIu64, + bgerror_resume_retry_interval); } MutableDBOptions::MutableDBOptions() diff --git a/options/db_options.h b/options/db_options.h index dc1120740..84a73eb27 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -88,6 +88,8 @@ struct ImmutableDBOptions { size_t log_readahead_size; std::shared_ptr file_checksum_gen_factory; bool best_efforts_recovery; + int max_bgerror_resume_count; + uint64_t bgerror_resume_retry_interval; }; struct MutableDBOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index d01776284..02fbb2e81 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -146,6 +146,10 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.file_checksum_gen_factory = immutable_db_options.file_checksum_gen_factory; options.best_efforts_recovery = immutable_db_options.best_efforts_recovery; + options.max_bgerror_resume_count = + immutable_db_options.max_bgerror_resume_count; + options.bgerror_resume_retry_interval = + immutable_db_options.bgerror_resume_retry_interval; return options; } diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index a45a62035..a87fd1e6d 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -324,7 +324,9 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "avoid_unnecessary_blocking_io=false;" "log_readahead_size=0;" "write_dbid_to_manifest=false;" - "best_efforts_recovery=false", + "best_efforts_recovery=false;" + "max_bgerror_resume_count=2;" + "bgerror_resume_retry_interval=1000000", new_options)); ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),