diff --git a/CMakeLists.txt b/CMakeLists.txt index 6fedf455a..ad7c7a0f8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,6 +481,7 @@ set(SOURCES db/db_info_dumper.cc db/db_iter.cc db/dbformat.cc + db/error_handler.cc db/event_helpers.cc db/experimental.cc db/external_sst_file_ingestion_job.cc @@ -877,6 +878,7 @@ if(WITH_TESTS) db/db_write_test.cc db/dbformat_test.cc db/deletefile_test.cc + db/error_handler_test.cc db/obsolete_files_test.cc db/external_sst_file_basic_test.cc db/external_sst_file_test.cc diff --git a/Makefile b/Makefile index d9b5e11cd..cf0a160f2 100644 --- a/Makefile +++ b/Makefile @@ -429,6 +429,7 @@ TESTS = \ db_table_properties_test \ db_statistics_test \ db_write_test \ + error_handler_test \ autovector_test \ blob_db_test \ cleanable_test \ @@ -1194,6 +1195,9 @@ db_statistics_test: db/db_statistics_test.o db/db_test_util.o $(LIBOBJECTS) $(TE db_write_test: db/db_write_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +error_handler_test: db/error_handler_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index b01eae323..6849ff95e 100644 --- a/TARGETS +++ b/TARGETS @@ -93,6 +93,7 @@ cpp_library( "db/db_info_dumper.cc", "db/db_iter.cc", "db/dbformat.cc", + "db/error_handler.cc", "db/event_helpers.cc", "db/experimental.cc", "db/external_sst_file_ingestion_job.cc", diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 48759d05f..8f9fd1c50 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -25,8 +25,10 @@ #include #include "db/builder.h" +#include "db/db_impl.h" #include "db/db_iter.h" #include "db/dbformat.h" +#include "db/error_handler.h" #include "db/event_helpers.h" #include "db/log_reader.h" #include "db/log_writer.h" @@ -307,7 +309,7 @@ CompactionJob::CompactionJob( const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, - InstrumentedMutex* db_mutex, Status* db_bg_error, + InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, std::shared_ptr table_cache, @@ -331,7 +333,7 @@ CompactionJob::CompactionJob( output_directory_(output_directory), stats_(stats), db_mutex_(db_mutex), - db_bg_error_(db_bg_error), + db_error_handler_(db_error_handler), existing_snapshots_(std::move(existing_snapshots)), earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), snapshot_checker_(snapshot_checker), @@ -1282,21 +1284,12 @@ Status CompactionJob::FinishCompactionOutputFile( if (sfm->IsMaxAllowedSpaceReached()) { // TODO(ajkr): should we return OK() if max space was reached by the final // compaction output file (similarly to how flush works when full)? - s = Status::NoSpace("Max allowed space was reached"); + s = Status::SpaceLimit("Max allowed space was reached"); TEST_SYNC_POINT( "CompactionJob::FinishCompactionOutputFile:" "MaxAllowedSpaceReached"); InstrumentedMutexLock l(db_mutex_); - if (db_bg_error_->ok()) { - Status new_bg_error = s; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError( - cfd->ioptions()->listeners, BackgroundErrorReason::kCompaction, - &new_bg_error, db_mutex_); - if (!new_bg_error.ok()) { - *db_bg_error_ = new_bg_error; - } - } + db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction); } } #endif diff --git a/db/compaction_job.h b/db/compaction_job.h index 05a4ffb1a..a31e8c142 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -47,6 +47,7 @@ namespace rocksdb { class Arena; +class ErrorHandler; class MemTable; class SnapshotChecker; class TableCache; @@ -64,7 +65,7 @@ class CompactionJob { LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, - Status* db_bg_error, + ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, const SnapshotChecker* snapshot_checker, @@ -145,7 +146,7 @@ class CompactionJob { Directory* output_directory_; Statistics* stats_; InstrumentedMutex* db_mutex_; - Status* db_bg_error_; + ErrorHandler* db_error_handler_; // If there were two snapshots with seq numbers s1 and // s2 and s1 < s2, and if we find two instances of a key k1 then lies // entirely within s1 and s2, then the earlier version of k1 can be safely diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 7987ffac0..2246d489f 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -12,6 +12,7 @@ #include "db/column_family.h" #include "db/compaction_job.h" +#include "db/error_handler.h" #include "db/version_set.h" #include "rocksdb/cache.h" #include "rocksdb/db.h" @@ -77,7 +78,8 @@ class CompactionJobTest : public testing::Test { &write_controller_)), shutting_down_(false), preserve_deletes_seqnum_(0), - mock_table_factory_(new mock::MockTableFactory()) { + mock_table_factory_(new mock::MockTableFactory()), + error_handler_(db_options_, &mutex_) { EXPECT_OK(env_->CreateDirIfMissing(dbname_)); db_options_.db_paths.emplace_back(dbname_, std::numeric_limits::max()); @@ -255,13 +257,12 @@ class CompactionJobTest : public testing::Test { EventLogger event_logger(db_options_.info_log.get()); // TODO(yiwu) add a mock snapshot checker and add test for it. SnapshotChecker* snapshot_checker = nullptr; - CompactionJob compaction_job(0, &compaction, db_options_, env_options_, - versions_.get(), &shutting_down_, - preserve_deletes_seqnum_, &log_buffer, - nullptr, nullptr, nullptr, &mutex_, &bg_error_, - snapshots, earliest_write_conflict_snapshot, - snapshot_checker, table_cache_, &event_logger, - false, false, dbname_, &compaction_job_stats_); + CompactionJob compaction_job( + 0, &compaction, db_options_, env_options_, versions_.get(), + &shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr, + nullptr, nullptr, &mutex_, &error_handler_, snapshots, + earliest_write_conflict_snapshot, snapshot_checker, table_cache_, + &event_logger, false, false, dbname_, &compaction_job_stats_); VerifyInitializationOfCompactionJobStats(compaction_job_stats_); compaction_job.Prepare(); @@ -303,7 +304,7 @@ class CompactionJobTest : public testing::Test { ColumnFamilyData* cfd_; std::unique_ptr compaction_filter_; std::shared_ptr merge_op_; - Status bg_error_; + ErrorHandler error_handler_; }; TEST_F(CompactionJobTest, Simple) { diff --git a/db/db_impl.cc b/db/db_impl.cc index adb0652ad..80dc3ff76 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -32,6 +32,7 @@ #include "db/db_info_dumper.h" #include "db/db_iter.h" #include "db/dbformat.h" +#include "db/error_handler.h" #include "db/event_helpers.h" #include "db/external_sst_file_ingestion_job.h" #include "db/flush_job.h" @@ -215,7 +216,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, // as well. use_custom_gc_(seq_per_batch), preserve_deletes_(options.preserve_deletes), - closed_(false) { + closed_(false), + error_handler_(immutable_db_options_, &mutex_) { env_->GetAbsolutePath(dbname, &db_absolute_path_); // Reserve ten files or so for other uses and give the rest to TableCache. @@ -244,6 +246,43 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, preserve_deletes_seqnum_.store(0); } +Status DBImpl::Resume() { + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB"); + + InstrumentedMutexLock db_mutex(&mutex_); + + if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) { + // Nothing to do + return Status::OK(); + } + + Status s = error_handler_.GetBGError(); + if (s.severity() > Status::Severity::kHardError) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "DB resume requested but failed due to Fatal/Unrecoverable error"); + return s; + } + + JobContext job_context(0); + FindObsoleteFiles(&job_context, true); + error_handler_.ClearBGError(); + mutex_.Unlock(); + + job_context.manifest_file_number = 1; + if (job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(job_context); + } + job_context.Clean(); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); + mutex_.Lock(); + MaybeScheduleFlushOrCompaction(); + + // No need to check BGError again. If something happened, event listener would be + // notified and the operation causing it would have failed + return Status::OK(); +} + // Will lock the mutex_, will wait for completion if wait is true void DBImpl::CancelAllBackgroundWork(bool wait) { InstrumentedMutexLock l(&mutex_); @@ -2879,9 +2918,9 @@ Status DBImpl::IngestExternalFile( std::list::iterator pending_output_elem; { InstrumentedMutexLock l(&mutex_); - if (!bg_error_.ok()) { + if (error_handler_.IsDBStopped()) { // Don't ingest files when there is a bg_error - return bg_error_; + return error_handler_.GetBGError(); } // Make sure that bg cleanup wont delete the files that we are ingesting diff --git a/db/db_impl.h b/db/db_impl.h index f6bfd576c..8ff6db637 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -23,6 +23,8 @@ #include "db/compaction_job.h" #include "db/dbformat.h" #include "db/external_sst_file_ingestion_job.h" +#include "db/error_handler.h" +#include "db/event_helpers.h" #include "db/flush_job.h" #include "db/flush_scheduler.h" #include "db/internal_stats.h" @@ -73,6 +75,9 @@ class DBImpl : public DB { const bool seq_per_batch = false); virtual ~DBImpl(); + using DB::Resume; + virtual Status Resume() override; + // Implementations of the DB interface using DB::Put; virtual Status Put(const WriteOptions& options, @@ -1265,9 +1270,6 @@ class DBImpl : public DB { PrepickedCompaction* prepicked_compaction; }; - // Have we encountered a background error in paranoid mode? - Status bg_error_; - // shall we disable deletion of obsolete files // if 0 the deletion is enabled. // if non-zero, files will not be getting deleted @@ -1425,6 +1427,8 @@ class DBImpl : public DB { // Flag to check whether Close() has been called on this DB bool closed_; + + ErrorHandler error_handler_; }; extern Options SanitizeOptions(const std::string& db, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 34d870e8d..f4f1680f9 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -14,6 +14,7 @@ #include #include "db/builder.h" +#include "db/error_handler.h" #include "db/event_helpers.h" #include "monitoring/iostats_context_imp.h" #include "monitoring/perf_context_imp.h" @@ -93,14 +94,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { // "number < current_log_number". MarkLogsSynced(current_log_number - 1, true, s); if (!s.ok()) { - Status new_bg_error = s; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kFlush, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } + error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); return s; } @@ -177,18 +171,9 @@ Status DBImpl::FlushMemTableToOutputFile( cfd->current()->storage_info()->LevelSummary(&tmp)); } - if (!s.ok() && !s.IsShutdownInProgress() && - immutable_db_options_.paranoid_checks && bg_error_.ok()) { + if (!s.ok() && !s.IsShutdownInProgress()) { Status new_bg_error = s; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kFlush, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - // if a bad error happened (not ShutdownInProgress), paranoid_checks is - // true, and the error isn't handled by callback, mark DB read-only - bg_error_ = new_bg_error; - } + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } if (s.ok()) { #ifndef ROCKSDB_LITE @@ -202,18 +187,12 @@ Status DBImpl::FlushMemTableToOutputFile( std::string file_path = MakeTableFileName( cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber()); sfm->OnAddFile(file_path); - if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) { - Status new_bg_error = Status::NoSpace("Max allowed space was reached"); + if (sfm->IsMaxAllowedSpaceReached()) { + Status new_bg_error = Status::SpaceLimit("Max allowed space was reached"); TEST_SYNC_POINT_CALLBACK( "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", &new_bg_error); - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kFlush, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); } } #endif // ROCKSDB_LITE @@ -674,7 +653,7 @@ Status DBImpl::CompactFilesImpl( env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_, - &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, @@ -736,16 +715,7 @@ Status DBImpl::CompactFilesImpl( "[%s] [JOB %d] Compaction error: %s", c->column_family_data()->GetName().c_str(), job_context->job_id, status.ToString().c_str()); - if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kCompaction, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } - } + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); } if (output_file_names != nullptr) { @@ -1141,6 +1111,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd, // Wait until the compaction completes s = WaitForFlushMemTable(cfd, &flush_memtable_id); } + TEST_SYNC_POINT("FlushMemTableFinished"); return s; } @@ -1149,7 +1120,7 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, Status s; // Wait until the compaction completes InstrumentedMutexLock l(&mutex_); - while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() && + while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() && (flush_memtable_id == nullptr || cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) { if (shutting_down_.load(std::memory_order_acquire)) { @@ -1163,8 +1134,8 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd, } bg_cv_.Wait(); } - if (!bg_error_.ok()) { - s = bg_error_; + if (error_handler_.IsDBStopped()) { + s = error_handler_.GetBGError(); } return s; } @@ -1383,9 +1354,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) { mutex_.AssertHeld(); - Status status = bg_error_; - if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { - status = Status::ShutdownInProgress(); + Status status; + if (!error_handler_.IsBGWorkStopped()) { + if (shutting_down_.load(std::memory_order_acquire)) { + status = Status::ShutdownInProgress(); + } + } else { + status = error_handler_.GetBGError(); } if (!status.ok()) { @@ -1631,9 +1606,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, is_manual && manual_compaction->disallow_trivial_move; CompactionJobStats compaction_job_stats; - Status status = bg_error_; - if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { - status = Status::ShutdownInProgress(); + Status status; + if (!error_handler_.IsBGWorkStopped()) { + if (shutting_down_.load(std::memory_order_acquire)) { + status = Status::ShutdownInProgress(); + } + } else { + status = error_handler_.GetBGError(); } if (!status.ok()) { @@ -1905,7 +1884,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), GetDataDir(c->column_family_data(), c->output_path_id()), stats_, - &mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + &mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, @@ -1951,16 +1930,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", status.ToString().c_str()); - if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kCompaction, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; - } - } + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); } if (is_manual) { diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 8032f8333..2b7720e58 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -10,6 +10,7 @@ #ifndef NDEBUG #include "db/db_impl.h" +#include "db/error_handler.h" #include "monitoring/thread_status_updater.h" namespace rocksdb { @@ -134,10 +135,10 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) { while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ || bg_flush_scheduled_ || (wait_unscheduled && unscheduled_compactions_)) && - bg_error_.ok()) { + !error_handler_.IsDBStopped()) { bg_cv_.Wait(); } - return bg_error_; + return error_handler_.GetBGError(); } void DBImpl::TEST_LockMutex() { diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 48753723f..31d014568 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -14,6 +14,7 @@ #include #include "db/builder.h" +#include "db/error_handler.h" #include "options/options_helper.h" #include "rocksdb/wal_filter.h" #include "table/block_based_table_factory.h" diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index e05023ba5..788ffb9c5 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -12,6 +12,7 @@ #define __STDC_FORMAT_MACROS #endif #include +#include "db/error_handler.h" #include "db/event_helpers.h" #include "monitoring/perf_context_imp.h" #include "options/options_helper.h" @@ -676,16 +677,7 @@ void DBImpl::WriteStatusCheck(const Status& status) { if (immutable_db_options_.paranoid_checks && !status.ok() && !status.IsBusy() && !status.IsIncomplete()) { mutex_.Lock(); - if (bg_error_.ok()) { - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kWriteCallback, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; // stop compaction & fail any further writes - } - } + error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); } } @@ -698,15 +690,8 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) { // ignore_missing_column_families. if (!status.ok()) { mutex_.Lock(); - assert(bg_error_.ok()); - Status new_bg_error = status; - // may temporarily unlock and lock the mutex. - EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners, - BackgroundErrorReason::kMemTable, - &new_bg_error, &mutex_); - if (!new_bg_error.ok()) { - bg_error_ = new_bg_error; // stop compaction & fail any further writes - } + assert(!error_handler_.IsBGWorkStopped()); + error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable); mutex_.Unlock(); } } @@ -736,8 +721,8 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options, status = HandleWriteBufferFull(write_context); } - if (UNLIKELY(status.ok() && !bg_error_.ok())) { - status = bg_error_; + if (UNLIKELY(status.ok())) { + status = error_handler_.GetBGError(); } if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { @@ -1176,7 +1161,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, mutex_.Lock(); } - while (bg_error_.ok() && write_controller_.IsStopped()) { + while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) { if (write_options.no_slowdown) { return Status::Incomplete(); } @@ -1192,7 +1177,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes, RecordTick(stats_, STALL_MICROS, time_delayed); } - return bg_error_; + return error_handler_.GetBGError(); } Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, diff --git a/db/error_handler.cc b/db/error_handler.cc new file mode 100644 index 000000000..a5b23a4b8 --- /dev/null +++ b/db/error_handler.cc @@ -0,0 +1,170 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "db/error_handler.h" +#include "db/event_helpers.h" + +namespace rocksdb { + +// Maps to help decide the severity of an error based on the +// BackgroundErrorReason, Code, SubCode and whether db_options.paranoid_checks +// is set or not. There are 3 maps, going from most specific to least specific +// (i.e from all 4 fields in a tuple to only the BackgroundErrorReason and +// paranoid_checks). The less specific map serves as a catch all in case we miss +// a specific error code or subcode. +std::map, + Status::Severity> + ErrorSeverityMap = { + // Errors during BG compaction + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kNoSpace, + true), + Status::Severity::kSoftError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kNoSpace, + false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, Status::SubCode::kSpaceLimit, + true), + Status::Severity::kHardError}, + // Errors during BG flush + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kNoSpace, true), + Status::Severity::kSoftError}, + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kNoSpace, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError, + Status::SubCode::kSpaceLimit, true), + Status::Severity::kHardError}, + // Errors during Write + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, Status::SubCode::kNoSpace, + true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, Status::SubCode::kNoSpace, + false), + Status::Severity::kFatalError}, +}; + +std::map, Status::Severity> + DefaultErrorSeverityMap = { + // Errors during BG compaction + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, + Status::Code::kIOError, false), + Status::Severity::kNoError}, + // Errors during BG flush + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlush, + Status::Code::kIOError, false), + Status::Severity::kNoError}, + // Errors during Write + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kCorruption, true), + Status::Severity::kUnrecoverableError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kCorruption, false), + Status::Severity::kNoError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, + Status::Code::kIOError, false), + Status::Severity::kNoError}, +}; + +std::map, Status::Severity> + DefaultReasonMap = { + // Errors during BG compaction + {std::make_tuple(BackgroundErrorReason::kCompaction, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kCompaction, false), + Status::Severity::kNoError}, + // Errors during BG flush + {std::make_tuple(BackgroundErrorReason::kFlush, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kFlush, false), + Status::Severity::kNoError}, + // Errors during Write + {std::make_tuple(BackgroundErrorReason::kWriteCallback, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kWriteCallback, false), + Status::Severity::kFatalError}, + // Errors during Memtable update + {std::make_tuple(BackgroundErrorReason::kMemTable, true), + Status::Severity::kFatalError}, + {std::make_tuple(BackgroundErrorReason::kMemTable, false), + Status::Severity::kFatalError}, +}; + +Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) { + db_mutex_->AssertHeld(); + + if (bg_err.ok()) { + return Status::OK(); + } + + bool paranoid = db_options_.paranoid_checks; + Status::Severity sev = Status::Severity::kFatalError; + Status new_bg_err; + bool found = false; + + { + auto entry = ErrorSeverityMap.find(std::make_tuple(reason, bg_err.code(), + bg_err.subcode(), paranoid)); + if (entry != ErrorSeverityMap.end()) { + sev = entry->second; + found = true; + } + } + + if (!found) { + auto entry = DefaultErrorSeverityMap.find(std::make_tuple(reason, + bg_err.code(), paranoid)); + if (entry != DefaultErrorSeverityMap.end()) { + sev = entry->second; + found = true; + } + } + + if (!found) { + auto entry = DefaultReasonMap.find(std::make_tuple(reason, paranoid)); + if (entry != DefaultReasonMap.end()) { + sev = entry->second; + } + } + + new_bg_err = Status(bg_err, sev); + if (!new_bg_err.ok()) { + Status s = new_bg_err; + EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_); + if (!s.ok() && (s.severity() > bg_error_.severity())) { + bg_error_ = s; + } + } + + return bg_error_; +} + +} diff --git a/db/error_handler.h b/db/error_handler.h new file mode 100644 index 000000000..8a1a411fc --- /dev/null +++ b/db/error_handler.h @@ -0,0 +1,52 @@ +// Copyright (c) 2018-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#include "monitoring/instrumented_mutex.h" +#include "options/db_options.h" +#include "rocksdb/listener.h" +#include "rocksdb/status.h" + +namespace rocksdb { + +class ErrorHandler { + public: + ErrorHandler(const ImmutableDBOptions& db_options, + InstrumentedMutex* db_mutex) + : db_options_(db_options), + bg_error_(Status::OK()), + db_mutex_(db_mutex) + {} + ~ErrorHandler() {} + + Status::Severity GetErrorSeverity(BackgroundErrorReason reason, + Status::Code code, Status::SubCode subcode); + + Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); + + Status GetBGError() + { + return bg_error_; + } + + void ClearBGError() { + bg_error_ = Status::OK(); + } + + bool IsDBStopped() { + return !bg_error_.ok(); + } + + bool IsBGWorkStopped() { + return !bg_error_.ok(); + } + + private: + const ImmutableDBOptions& db_options_; + Status bg_error_; + InstrumentedMutex* db_mutex_; +}; + +} diff --git a/db/error_handler_test.cc b/db/error_handler_test.cc new file mode 100644 index 000000000..abd5663d8 --- /dev/null +++ b/db/error_handler_test.cc @@ -0,0 +1,138 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/db_test_util.h" +#include "port/stack_trace.h" +#include "rocksdb/perf_context.h" +#include "util/fault_injection_test_env.h" +#if !defined(ROCKSDB_LITE) +#include "util/sync_point.h" +#endif + +namespace rocksdb { + +class DBErrorHandlingTest : public DBTestBase { + public: + DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {} +}; + +class DBErrorHandlingEnv : public EnvWrapper { + public: + DBErrorHandlingEnv() : EnvWrapper(Env::Default()), + trig_no_space(false), trig_io_error(false) {} + + void SetTrigNoSpace() {trig_no_space = true;} + void SetTrigIoError() {trig_io_error = true;} + private: + bool trig_no_space; + bool trig_io_error; +}; + +TEST_F(DBErrorHandlingTest, FLushWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.env = fault_env.get(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::Start", [&](void *) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + fault_env->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + + Destroy(options); +} + +TEST_F(DBErrorHandlingTest, CompactionWriteError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.env = fault_env.get(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void *) { + fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space")); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError); + + fault_env->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Destroy(options); +} + +TEST_F(DBErrorHandlingTest, CorruptionError) { + std::unique_ptr fault_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.env = fault_env.get(); + Status s; + DestroyAndReopen(options); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"FlushMemTableFinished", "BackgroundCallCompaction:0"}}); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", [&](void *) { + fault_env->SetFilesystemActive(false, Status::Corruption("Corruption")); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kUnrecoverableError); + + fault_env->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_NE(s, Status::OK()); + Destroy(options); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + rocksdb::port::InstallStackTraceHandler(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/flush_job.cc b/db/flush_job.cc index 0e4657984..06132fb86 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -187,6 +187,7 @@ void FlushJob::PickMemTable() { Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta) { + TEST_SYNC_POINT("FlushJob::Start"); db_mutex_->AssertHeld(); assert(pick_memtable_called); AutoThreadOperationStageUpdater stage_run( diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 80e9eb206..f3aa39775 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -170,6 +170,8 @@ class DB { const std::vector& column_families, std::vector* handles, DB** dbptr); + virtual Status Resume() { return Status::NotSupported(); } + // Close the DB by releasing resources, closing files etc. This should be // called before calling the destructor so that the caller can get back a // status in case there are any errors. This will not fsync the WAL files. diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index c803f89bb..e3576b2aa 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -25,7 +25,7 @@ namespace rocksdb { class Status { public: // Create a success status. - Status() : code_(kOk), subcode_(kNone), state_(nullptr) {} + Status() : code_(kOk), subcode_(kNone), sev_(kNoError), state_(nullptr) {} ~Status() { free((void *) state_); } // Copy the specified status. @@ -44,7 +44,7 @@ class Status { bool operator==(const Status& rhs) const; bool operator!=(const Status& rhs) const; - enum Code { + enum Code : unsigned char { kOk = 0, kNotFound = 1, kCorruption = 2, @@ -64,7 +64,7 @@ class Status { Code code() const { return code_; } - enum SubCode { + enum SubCode : unsigned char { kNone = 0, kMutexTimeout = 1, kLockTimeout = 2, @@ -73,11 +73,24 @@ class Status { kDeadlock = 5, kStaleFile = 6, kMemoryLimit = 7, + kSpaceLimit = 8, kMaxSubCode }; SubCode subcode() const { return subcode_; } + enum Severity : unsigned char { + kNoError = 0, + kSoftError = 1, + kHardError = 2, + kFatalError = 3, + kUnrecoverableError = 4, + kMaxSeverity + }; + + Status(const Status& s, Severity sev); + Severity severity() const { return sev_; } + // Returns a C style string indicating the message of the Status const char* getState() const { return state_; } @@ -181,6 +194,11 @@ class Status { return Status(kAborted, kMemoryLimit, msg, msg2); } + static Status SpaceLimit() { return Status(kIOError, kSpaceLimit); } + static Status SpaceLimit(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kIOError, kSpaceLimit, msg, msg2); + } + // Returns true iff the status indicates success. bool ok() const { return code() == kOk; } @@ -261,12 +279,13 @@ class Status { // state_[4..] == message Code code_; SubCode subcode_; + Severity sev_; const char* state_; static const char* msgs[static_cast(kMaxSubCode)]; explicit Status(Code _code, SubCode _subcode = kNone) - : code_(_code), subcode_(_subcode), state_(nullptr) {} + : code_(_code), subcode_(_subcode), sev_(kNoError), state_(nullptr) {} Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2); Status(Code _code, const Slice& msg, const Slice& msg2) @@ -275,7 +294,11 @@ class Status { static const char* CopyState(const char* s); }; -inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_) { +inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_), sev_(s.sev_) { + state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); +} +inline Status::Status(const Status& s, Severity sev) + : code_(s.code_), subcode_(s.subcode_), sev_(sev) { state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); } inline Status& Status::operator=(const Status& s) { @@ -284,6 +307,7 @@ inline Status& Status::operator=(const Status& s) { if (this != &s) { code_ = s.code_; subcode_ = s.subcode_; + sev_ = s.sev_; free((void *) state_); state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_); } @@ -308,6 +332,8 @@ inline Status& Status::operator=(Status&& s) s.code_ = kOk; subcode_ = std::move(s.subcode_); s.subcode_ = kNone; + sev_ = std::move(s.sev_); + s.sev_ = kNoError; free((void *)state_); state_ = nullptr; std::swap(state_, s.state_); diff --git a/src.mk b/src.mk index 047eaa3f2..44bb1de13 100644 --- a/src.mk +++ b/src.mk @@ -25,6 +25,7 @@ LIB_SOURCES = \ db/db_info_dumper.cc \ db/db_iter.cc \ db/dbformat.cc \ + db/error_handler.cc \ db/event_helpers.cc \ db/experimental.cc \ db/external_sst_file_ingestion_job.cc \ @@ -293,6 +294,7 @@ MAIN_SOURCES = \ db/dbformat_test.cc \ db/deletefile_test.cc \ db/env_timed_test.cc \ + db/error_handler_test.cc \ db/external_sst_file_basic_test.cc \ db/external_sst_file_test.cc \ db/fault_injection_test.cc \ diff --git a/util/fault_injection_test_env.cc b/util/fault_injection_test_env.cc index 3b3a8b935..46e1e0d77 100644 --- a/util/fault_injection_test_env.cc +++ b/util/fault_injection_test_env.cc @@ -121,7 +121,7 @@ TestWritableFile::~TestWritableFile() { Status TestWritableFile::Append(const Slice& data) { if (!env_->IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return env_->GetError(); } Status s = target_->Append(data); if (s.ok()) { @@ -172,7 +172,7 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname, unique_ptr* result, const EnvOptions& soptions) { if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return GetError(); } // Not allow overwriting files Status s = target()->FileExists(fname); @@ -199,7 +199,7 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname, Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return GetError(); } Status s = EnvWrapper::DeleteFile(f); if (!s.ok()) { @@ -216,7 +216,7 @@ Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { Status FaultInjectionTestEnv::RenameFile(const std::string& s, const std::string& t) { if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); + return GetError(); } Status ret = EnvWrapper::RenameFile(s, t); diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h index 1992ab52e..1a62c619e 100644 --- a/util/fault_injection_test_env.h +++ b/util/fault_injection_test_env.h @@ -145,12 +145,20 @@ class FaultInjectionTestEnv : public EnvWrapper { MutexLock l(&mutex_); return filesystem_active_; } - void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; } - void SetFilesystemActive(bool active) { + void SetFilesystemActiveNoLock(bool active, + Status error = Status::Corruption("Not active")) { + filesystem_active_ = active; + if (!active) { + error_ = error; + } + } + void SetFilesystemActive(bool active, + Status error = Status::Corruption("Not active")) { MutexLock l(&mutex_); - SetFilesystemActiveNoLock(active); + SetFilesystemActiveNoLock(active, error); } void AssertNoOpenFile() { assert(open_files_.empty()); } + Status GetError() { return error_; } private: port::Mutex mutex_; @@ -159,6 +167,7 @@ class FaultInjectionTestEnv : public EnvWrapper { std::unordered_map> dir_to_new_files_since_last_sync_; bool filesystem_active_; // Record flushes, syncs, writes + Status error_; }; } // namespace rocksdb diff --git a/util/status.cc b/util/status.cc index f302d2ead..aaad67367 100644 --- a/util/status.cc +++ b/util/status.cc @@ -24,7 +24,7 @@ const char* Status::CopyState(const char* state) { } Status::Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2) - : code_(_code), subcode_(_subcode) { + : code_(_code), subcode_(_subcode), sev_(kNoError) { assert(code_ != kOk); assert(subcode_ != kMaxSubCode); const size_t len1 = msg.size(); diff --git a/util/status_message.cc b/util/status_message.cc index 6e9d4e4f7..7150e817e 100644 --- a/util/status_message.cc +++ b/util/status_message.cc @@ -15,7 +15,8 @@ const char* Status::msgs[] = { "No space left on device", // kNoSpace "Deadlock", // kDeadlock "Stale file handle", // kStaleFile - "Memory limit reached" // kMemoryLimit + "Memory limit reached", // kMemoryLimit + "Space limit reached" // kSpaceLimit }; } // namespace rocksdb