Allow DB resume after background errors (#3997)

Summary:
Currently, if RocksDB encounters errors during a write operation (user requested or BG operations), it sets DBImpl::bg_error_ and fails subsequent writes. This PR allows the DB to be resumed for certain classes of errors. It consists of 3 parts -
1. Introduce Status::Severity in rocksdb::Status to indicate whether a given error can be recovered from or not
2. Refactor the error handling code so that setting bg_error_ and deciding on severity is in one place
3. Provide an API for the user to clear the error and resume the DB instance

This whole change is broken up into multiple PRs. Initially, we only allow clearing the error for Status::NoSpace() errors during background flush/compaction. Subsequent PRs will expand this to include more errors and foreground operations such as Put(), and implement a polling mechanism for out-of-space errors.
Closes https://github.com/facebook/rocksdb/pull/3997

Differential Revision: D8653831

Pulled By: anand1976

fbshipit-source-id: 6dc835c76122443a7668497c0226b4f072bc6afd
main
Anand Ananthabhotla 7 years ago committed by Facebook Github Bot
parent 26d67e357e
commit 52d4c9b7f6
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 1
      TARGETS
  4. 19
      db/compaction_job.cc
  5. 5
      db/compaction_job.h
  6. 19
      db/compaction_job_test.cc
  7. 45
      db/db_impl.cc
  8. 10
      db/db_impl.h
  9. 88
      db/db_impl_compaction_flush.cc
  10. 5
      db/db_impl_debug.cc
  11. 1
      db/db_impl_open.cc
  12. 31
      db/db_impl_write.cc
  13. 170
      db/error_handler.cc
  14. 52
      db/error_handler.h
  15. 138
      db/error_handler_test.cc
  16. 1
      db/flush_job.cc
  17. 2
      include/rocksdb/db.h
  18. 36
      include/rocksdb/status.h
  19. 2
      src.mk
  20. 8
      util/fault_injection_test_env.cc
  21. 15
      util/fault_injection_test_env.h
  22. 2
      util/status.cc
  23. 3
      util/status_message.cc

@ -481,6 +481,7 @@ set(SOURCES
db/db_info_dumper.cc
db/db_iter.cc
db/dbformat.cc
db/error_handler.cc
db/event_helpers.cc
db/experimental.cc
db/external_sst_file_ingestion_job.cc
@ -877,6 +878,7 @@ if(WITH_TESTS)
db/db_write_test.cc
db/dbformat_test.cc
db/deletefile_test.cc
db/error_handler_test.cc
db/obsolete_files_test.cc
db/external_sst_file_basic_test.cc
db/external_sst_file_test.cc

@ -429,6 +429,7 @@ TESTS = \
db_table_properties_test \
db_statistics_test \
db_write_test \
error_handler_test \
autovector_test \
blob_db_test \
cleanable_test \
@ -1194,6 +1195,9 @@ db_statistics_test: db/db_statistics_test.o db/db_test_util.o $(LIBOBJECTS) $(TE
db_write_test: db/db_write_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
error_handler_test: db/error_handler_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
external_sst_file_basic_test: db/external_sst_file_basic_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -93,6 +93,7 @@ cpp_library(
"db/db_info_dumper.cc",
"db/db_iter.cc",
"db/dbformat.cc",
"db/error_handler.cc",
"db/event_helpers.cc",
"db/experimental.cc",
"db/external_sst_file_ingestion_job.cc",

@ -25,8 +25,10 @@
#include <vector>
#include "db/builder.h"
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
@ -307,7 +309,7 @@ CompactionJob::CompactionJob(
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, Status* db_bg_error,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
@ -331,7 +333,7 @@ CompactionJob::CompactionJob(
output_directory_(output_directory),
stats_(stats),
db_mutex_(db_mutex),
db_bg_error_(db_bg_error),
db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
@ -1282,21 +1284,12 @@ Status CompactionJob::FinishCompactionOutputFile(
if (sfm->IsMaxAllowedSpaceReached()) {
// TODO(ajkr): should we return OK() if max space was reached by the final
// compaction output file (similarly to how flush works when full)?
s = Status::NoSpace("Max allowed space was reached");
s = Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT(
"CompactionJob::FinishCompactionOutputFile:"
"MaxAllowedSpaceReached");
InstrumentedMutexLock l(db_mutex_);
if (db_bg_error_->ok()) {
Status new_bg_error = s;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(
cfd->ioptions()->listeners, BackgroundErrorReason::kCompaction,
&new_bg_error, db_mutex_);
if (!new_bg_error.ok()) {
*db_bg_error_ = new_bg_error;
}
}
db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
}
}
#endif

@ -47,6 +47,7 @@
namespace rocksdb {
class Arena;
class ErrorHandler;
class MemTable;
class SnapshotChecker;
class TableCache;
@ -64,7 +65,7 @@ class CompactionJob {
LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory,
Statistics* stats, InstrumentedMutex* db_mutex,
Status* db_bg_error,
ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker,
@ -145,7 +146,7 @@ class CompactionJob {
Directory* output_directory_;
Statistics* stats_;
InstrumentedMutex* db_mutex_;
Status* db_bg_error_;
ErrorHandler* db_error_handler_;
// If there were two snapshots with seq numbers s1 and
// s2 and s1 < s2, and if we find two instances of a key k1 then lies
// entirely within s1 and s2, then the earlier version of k1 can be safely

@ -12,6 +12,7 @@
#include "db/column_family.h"
#include "db/compaction_job.h"
#include "db/error_handler.h"
#include "db/version_set.h"
#include "rocksdb/cache.h"
#include "rocksdb/db.h"
@ -77,7 +78,8 @@ class CompactionJobTest : public testing::Test {
&write_controller_)),
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()) {
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(db_options_, &mutex_) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());
@ -255,13 +257,12 @@ class CompactionJobTest : public testing::Test {
EventLogger event_logger(db_options_.info_log.get());
// TODO(yiwu) add a mock snapshot checker and add test for it.
SnapshotChecker* snapshot_checker = nullptr;
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
versions_.get(), &shutting_down_,
preserve_deletes_seqnum_, &log_buffer,
nullptr, nullptr, nullptr, &mutex_, &bg_error_,
snapshots, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger,
false, false, dbname_, &compaction_job_stats_);
CompactionJob compaction_job(
0, &compaction, db_options_, env_options_, versions_.get(),
&shutting_down_, preserve_deletes_seqnum_, &log_buffer, nullptr,
nullptr, nullptr, &mutex_, &error_handler_, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, table_cache_,
&event_logger, false, false, dbname_, &compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
compaction_job.Prepare();
@ -303,7 +304,7 @@ class CompactionJobTest : public testing::Test {
ColumnFamilyData* cfd_;
std::unique_ptr<CompactionFilter> compaction_filter_;
std::shared_ptr<MergeOperator> merge_op_;
Status bg_error_;
ErrorHandler error_handler_;
};
TEST_F(CompactionJobTest, Simple) {

@ -32,6 +32,7 @@
#include "db/db_info_dumper.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/flush_job.h"
@ -215,7 +216,8 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// as well.
use_custom_gc_(seq_per_batch),
preserve_deletes_(options.preserve_deletes),
closed_(false) {
closed_(false),
error_handler_(immutable_db_options_, &mutex_) {
env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache.
@ -244,6 +246,43 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
preserve_deletes_seqnum_.store(0);
}
Status DBImpl::Resume() {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Resuming DB");
InstrumentedMutexLock db_mutex(&mutex_);
if (!error_handler_.IsDBStopped() && !error_handler_.IsBGWorkStopped()) {
// Nothing to do
return Status::OK();
}
Status s = error_handler_.GetBGError();
if (s.severity() > Status::Severity::kHardError) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Fatal/Unrecoverable error");
return s;
}
JobContext job_context(0);
FindObsoleteFiles(&job_context, true);
error_handler_.ClearBGError();
mutex_.Unlock();
job_context.manifest_file_number = 1;
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
mutex_.Lock();
MaybeScheduleFlushOrCompaction();
// No need to check BGError again. If something happened, event listener would be
// notified and the operation causing it would have failed
return Status::OK();
}
// Will lock the mutex_, will wait for completion if wait is true
void DBImpl::CancelAllBackgroundWork(bool wait) {
InstrumentedMutexLock l(&mutex_);
@ -2879,9 +2918,9 @@ Status DBImpl::IngestExternalFile(
std::list<uint64_t>::iterator pending_output_elem;
{
InstrumentedMutexLock l(&mutex_);
if (!bg_error_.ok()) {
if (error_handler_.IsDBStopped()) {
// Don't ingest files when there is a bg_error
return bg_error_;
return error_handler_.GetBGError();
}
// Make sure that bg cleanup wont delete the files that we are ingesting

@ -23,6 +23,8 @@
#include "db/compaction_job.h"
#include "db/dbformat.h"
#include "db/external_sst_file_ingestion_job.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/flush_job.h"
#include "db/flush_scheduler.h"
#include "db/internal_stats.h"
@ -73,6 +75,9 @@ class DBImpl : public DB {
const bool seq_per_batch = false);
virtual ~DBImpl();
using DB::Resume;
virtual Status Resume() override;
// Implementations of the DB interface
using DB::Put;
virtual Status Put(const WriteOptions& options,
@ -1265,9 +1270,6 @@ class DBImpl : public DB {
PrepickedCompaction* prepicked_compaction;
};
// Have we encountered a background error in paranoid mode?
Status bg_error_;
// shall we disable deletion of obsolete files
// if 0 the deletion is enabled.
// if non-zero, files will not be getting deleted
@ -1425,6 +1427,8 @@ class DBImpl : public DB {
// Flag to check whether Close() has been called on this DB
bool closed_;
ErrorHandler error_handler_;
};
extern Options SanitizeOptions(const std::string& db,

@ -14,6 +14,7 @@
#include <inttypes.h>
#include "db/builder.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
@ -93,14 +94,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
Status new_bg_error = s;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kFlush,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s;
}
@ -177,18 +171,9 @@ Status DBImpl::FlushMemTableToOutputFile(
cfd->current()->storage_info()->LevelSummary(&tmp));
}
if (!s.ok() && !s.IsShutdownInProgress() &&
immutable_db_options_.paranoid_checks && bg_error_.ok()) {
if (!s.ok() && !s.IsShutdownInProgress()) {
Status new_bg_error = s;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kFlush,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
// if a bad error happened (not ShutdownInProgress), paranoid_checks is
// true, and the error isn't handled by callback, mark DB read-only
bg_error_ = new_bg_error;
}
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
if (s.ok()) {
#ifndef ROCKSDB_LITE
@ -202,18 +187,12 @@ Status DBImpl::FlushMemTableToOutputFile(
std::string file_path = MakeTableFileName(
cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
Status new_bg_error = Status::NoSpace("Max allowed space was reached");
if (sfm->IsMaxAllowedSpaceReached()) {
Status new_bg_error = Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
&new_bg_error);
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kFlush,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
#endif // ROCKSDB_LITE
@ -674,7 +653,7 @@ Status DBImpl::CompactFilesImpl(
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
&error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
@ -736,16 +715,7 @@ Status DBImpl::CompactFilesImpl(
"[%s] [JOB %d] Compaction error: %s",
c->column_family_data()->GetName().c_str(),
job_context->job_id, status.ToString().c_str());
if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kCompaction,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
}
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
if (output_file_names != nullptr) {
@ -1141,6 +1111,7 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
// Wait until the compaction completes
s = WaitForFlushMemTable(cfd, &flush_memtable_id);
}
TEST_SYNC_POINT("FlushMemTableFinished");
return s;
}
@ -1149,7 +1120,7 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
Status s;
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
while (cfd->imm()->NumNotFlushed() > 0 && bg_error_.ok() &&
while (cfd->imm()->NumNotFlushed() > 0 && !error_handler_.IsDBStopped() &&
(flush_memtable_id == nullptr ||
cfd->imm()->GetEarliestMemTableID() <= *flush_memtable_id)) {
if (shutting_down_.load(std::memory_order_acquire)) {
@ -1163,8 +1134,8 @@ Status DBImpl::WaitForFlushMemTable(ColumnFamilyData* cfd,
}
bg_cv_.Wait();
}
if (!bg_error_.ok()) {
s = bg_error_;
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
return s;
}
@ -1383,9 +1354,13 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer) {
mutex_.AssertHeld();
Status status = bg_error_;
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
Status status;
if (!error_handler_.IsBGWorkStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
}
} else {
status = error_handler_.GetBGError();
}
if (!status.ok()) {
@ -1631,9 +1606,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
is_manual && manual_compaction->disallow_trivial_move;
CompactionJobStats compaction_job_stats;
Status status = bg_error_;
if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
Status status;
if (!error_handler_.IsBGWorkStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
}
} else {
status = error_handler_.GetBGError();
}
if (!status.ok()) {
@ -1905,7 +1884,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
GetDataDir(c->column_family_data(), c->output_path_id()), stats_,
&mutex_, &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
&mutex_, &error_handler_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
@ -1951,16 +1930,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kCompaction,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
}
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
if (is_manual) {

@ -10,6 +10,7 @@
#ifndef NDEBUG
#include "db/db_impl.h"
#include "db/error_handler.h"
#include "monitoring/thread_status_updater.h"
namespace rocksdb {
@ -134,10 +135,10 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ ||
(wait_unscheduled && unscheduled_compactions_)) &&
bg_error_.ok()) {
!error_handler_.IsDBStopped()) {
bg_cv_.Wait();
}
return bg_error_;
return error_handler_.GetBGError();
}
void DBImpl::TEST_LockMutex() {

@ -14,6 +14,7 @@
#include <inttypes.h>
#include "db/builder.h"
#include "db/error_handler.h"
#include "options/options_helper.h"
#include "rocksdb/wal_filter.h"
#include "table/block_based_table_factory.h"

@ -12,6 +12,7 @@
#define __STDC_FORMAT_MACROS
#endif
#include <inttypes.h>
#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "monitoring/perf_context_imp.h"
#include "options/options_helper.h"
@ -676,16 +677,7 @@ void DBImpl::WriteStatusCheck(const Status& status) {
if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock();
if (bg_error_.ok()) {
Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kWriteCallback,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error; // stop compaction & fail any further writes
}
}
error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback);
mutex_.Unlock();
}
}
@ -698,15 +690,8 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) {
// ignore_missing_column_families.
if (!status.ok()) {
mutex_.Lock();
assert(bg_error_.ok());
Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kMemTable,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error; // stop compaction & fail any further writes
}
assert(!error_handler_.IsBGWorkStopped());
error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable);
mutex_.Unlock();
}
}
@ -736,8 +721,8 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
status = HandleWriteBufferFull(write_context);
}
if (UNLIKELY(status.ok() && !bg_error_.ok())) {
status = bg_error_;
if (UNLIKELY(status.ok())) {
status = error_handler_.GetBGError();
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
@ -1176,7 +1161,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
mutex_.Lock();
}
while (bg_error_.ok() && write_controller_.IsStopped()) {
while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) {
if (write_options.no_slowdown) {
return Status::Incomplete();
}
@ -1192,7 +1177,7 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
RecordTick(stats_, STALL_MICROS, time_delayed);
}
return bg_error_;
return error_handler_.GetBGError();
}
Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,

@ -0,0 +1,170 @@
// Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "db/error_handler.h"
#include "db/event_helpers.h"
namespace rocksdb {
// Maps to help decide the severity of an error based on the
// BackgroundErrorReason, Code, SubCode and whether db_options.paranoid_checks
// is set or not. There are 3 maps, going from most specific to least specific
// (i.e from all 4 fields in a tuple to only the BackgroundErrorReason and
// paranoid_checks). The less specific map serves as a catch all in case we miss
// a specific error code or subcode.
std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Severity>
ErrorSeverityMap = {
// Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, Status::SubCode::kNoSpace,
true),
Status::Severity::kSoftError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, Status::SubCode::kSpaceLimit,
true),
Status::Severity::kHardError},
// Errors during BG flush
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kNoSpace, true),
Status::Severity::kSoftError},
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kNoSpace, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kSpaceLimit, true),
Status::Severity::kHardError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kNoSpace,
true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kFatalError},
};
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity>
DefaultErrorSeverityMap = {
// Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kCorruption, true),
Status::Severity::kUnrecoverableError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kCorruption, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kCompaction,
Status::Code::kIOError, false),
Status::Severity::kNoError},
// Errors during BG flush
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kCorruption, true),
Status::Severity::kUnrecoverableError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kCorruption, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kIOError, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kIOError, false),
Status::Severity::kNoError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kCorruption, true),
Status::Severity::kUnrecoverableError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kCorruption, false),
Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, false),
Status::Severity::kNoError},
};
std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
DefaultReasonMap = {
// Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kCompaction, false),
Status::Severity::kNoError},
// Errors during BG flush
{std::make_tuple(BackgroundErrorReason::kFlush, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush, false),
Status::Severity::kNoError},
// Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback, false),
Status::Severity::kFatalError},
// Errors during Memtable update
{std::make_tuple(BackgroundErrorReason::kMemTable, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kMemTable, false),
Status::Severity::kFatalError},
};
Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) {
db_mutex_->AssertHeld();
if (bg_err.ok()) {
return Status::OK();
}
bool paranoid = db_options_.paranoid_checks;
Status::Severity sev = Status::Severity::kFatalError;
Status new_bg_err;
bool found = false;
{
auto entry = ErrorSeverityMap.find(std::make_tuple(reason, bg_err.code(),
bg_err.subcode(), paranoid));
if (entry != ErrorSeverityMap.end()) {
sev = entry->second;
found = true;
}
}
if (!found) {
auto entry = DefaultErrorSeverityMap.find(std::make_tuple(reason,
bg_err.code(), paranoid));
if (entry != DefaultErrorSeverityMap.end()) {
sev = entry->second;
found = true;
}
}
if (!found) {
auto entry = DefaultReasonMap.find(std::make_tuple(reason, paranoid));
if (entry != DefaultReasonMap.end()) {
sev = entry->second;
}
}
new_bg_err = Status(bg_err, sev);
if (!new_bg_err.ok()) {
Status s = new_bg_err;
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_);
if (!s.ok() && (s.severity() > bg_error_.severity())) {
bg_error_ = s;
}
}
return bg_error_;
}
}

@ -0,0 +1,52 @@
// Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
#include "rocksdb/listener.h"
#include "rocksdb/status.h"
namespace rocksdb {
class ErrorHandler {
public:
ErrorHandler(const ImmutableDBOptions& db_options,
InstrumentedMutex* db_mutex)
: db_options_(db_options),
bg_error_(Status::OK()),
db_mutex_(db_mutex)
{}
~ErrorHandler() {}
Status::Severity GetErrorSeverity(BackgroundErrorReason reason,
Status::Code code, Status::SubCode subcode);
Status SetBGError(const Status& bg_err, BackgroundErrorReason reason);
Status GetBGError()
{
return bg_error_;
}
void ClearBGError() {
bg_error_ = Status::OK();
}
bool IsDBStopped() {
return !bg_error_.ok();
}
bool IsBGWorkStopped() {
return !bg_error_.ok();
}
private:
const ImmutableDBOptions& db_options_;
Status bg_error_;
InstrumentedMutex* db_mutex_;
};
}

@ -0,0 +1,138 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
#include "util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
#include "util/sync_point.h"
#endif
namespace rocksdb {
class DBErrorHandlingTest : public DBTestBase {
public:
DBErrorHandlingTest() : DBTestBase("/db_error_handling_test") {}
};
class DBErrorHandlingEnv : public EnvWrapper {
public:
DBErrorHandlingEnv() : EnvWrapper(Env::Default()),
trig_no_space(false), trig_io_error(false) {}
void SetTrigNoSpace() {trig_no_space = true;}
void SetTrigIoError() {trig_io_error = true;}
private:
bool trig_no_space;
bool trig_io_error;
};
TEST_F(DBErrorHandlingTest, FLushWriteError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = fault_env.get();
Status s;
DestroyAndReopen(options);
Put(Key(0), "va;");
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::Start", [&](void *) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
});
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError);
fault_env->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Destroy(options);
}
TEST_F(DBErrorHandlingTest, CompactionWriteError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2;
options.env = fault_env.get();
Status s;
DestroyAndReopen(options);
Put(Key(0), "va;");
Put(Key(2), "va;");
s = Flush();
ASSERT_EQ(s, Status::OK());
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"FlushMemTableFinished", "BackgroundCallCompaction:0"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void *) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Put(Key(1), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError);
fault_env->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Destroy(options);
}
TEST_F(DBErrorHandlingTest, CorruptionError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2;
options.env = fault_env.get();
Status s;
DestroyAndReopen(options);
Put(Key(0), "va;");
Put(Key(2), "va;");
s = Flush();
ASSERT_EQ(s, Status::OK());
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"FlushMemTableFinished", "BackgroundCallCompaction:0"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0", [&](void *) {
fault_env->SetFilesystemActive(false, Status::Corruption("Corruption"));
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Put(Key(1), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kUnrecoverableError);
fault_env->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_NE(s, Status::OK());
Destroy(options);
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -187,6 +187,7 @@ void FlushJob::PickMemTable() {
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
FileMetaData* file_meta) {
TEST_SYNC_POINT("FlushJob::Start");
db_mutex_->AssertHeld();
assert(pick_memtable_called);
AutoThreadOperationStageUpdater stage_run(

@ -170,6 +170,8 @@ class DB {
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, DB** dbptr);
virtual Status Resume() { return Status::NotSupported(); }
// Close the DB by releasing resources, closing files etc. This should be
// called before calling the destructor so that the caller can get back a
// status in case there are any errors. This will not fsync the WAL files.

@ -25,7 +25,7 @@ namespace rocksdb {
class Status {
public:
// Create a success status.
Status() : code_(kOk), subcode_(kNone), state_(nullptr) {}
Status() : code_(kOk), subcode_(kNone), sev_(kNoError), state_(nullptr) {}
~Status() { free((void *) state_); }
// Copy the specified status.
@ -44,7 +44,7 @@ class Status {
bool operator==(const Status& rhs) const;
bool operator!=(const Status& rhs) const;
enum Code {
enum Code : unsigned char {
kOk = 0,
kNotFound = 1,
kCorruption = 2,
@ -64,7 +64,7 @@ class Status {
Code code() const { return code_; }
enum SubCode {
enum SubCode : unsigned char {
kNone = 0,
kMutexTimeout = 1,
kLockTimeout = 2,
@ -73,11 +73,24 @@ class Status {
kDeadlock = 5,
kStaleFile = 6,
kMemoryLimit = 7,
kSpaceLimit = 8,
kMaxSubCode
};
SubCode subcode() const { return subcode_; }
enum Severity : unsigned char {
kNoError = 0,
kSoftError = 1,
kHardError = 2,
kFatalError = 3,
kUnrecoverableError = 4,
kMaxSeverity
};
Status(const Status& s, Severity sev);
Severity severity() const { return sev_; }
// Returns a C style string indicating the message of the Status
const char* getState() const { return state_; }
@ -181,6 +194,11 @@ class Status {
return Status(kAborted, kMemoryLimit, msg, msg2);
}
static Status SpaceLimit() { return Status(kIOError, kSpaceLimit); }
static Status SpaceLimit(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIOError, kSpaceLimit, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return code() == kOk; }
@ -261,12 +279,13 @@ class Status {
// state_[4..] == message
Code code_;
SubCode subcode_;
Severity sev_;
const char* state_;
static const char* msgs[static_cast<int>(kMaxSubCode)];
explicit Status(Code _code, SubCode _subcode = kNone)
: code_(_code), subcode_(_subcode), state_(nullptr) {}
: code_(_code), subcode_(_subcode), sev_(kNoError), state_(nullptr) {}
Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2);
Status(Code _code, const Slice& msg, const Slice& msg2)
@ -275,7 +294,11 @@ class Status {
static const char* CopyState(const char* s);
};
inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_) {
inline Status::Status(const Status& s) : code_(s.code_), subcode_(s.subcode_), sev_(s.sev_) {
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
inline Status::Status(const Status& s, Severity sev)
: code_(s.code_), subcode_(s.subcode_), sev_(sev) {
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
inline Status& Status::operator=(const Status& s) {
@ -284,6 +307,7 @@ inline Status& Status::operator=(const Status& s) {
if (this != &s) {
code_ = s.code_;
subcode_ = s.subcode_;
sev_ = s.sev_;
free((void *) state_);
state_ = (s.state_ == nullptr) ? nullptr : CopyState(s.state_);
}
@ -308,6 +332,8 @@ inline Status& Status::operator=(Status&& s)
s.code_ = kOk;
subcode_ = std::move(s.subcode_);
s.subcode_ = kNone;
sev_ = std::move(s.sev_);
s.sev_ = kNoError;
free((void *)state_);
state_ = nullptr;
std::swap(state_, s.state_);

@ -25,6 +25,7 @@ LIB_SOURCES = \
db/db_info_dumper.cc \
db/db_iter.cc \
db/dbformat.cc \
db/error_handler.cc \
db/event_helpers.cc \
db/experimental.cc \
db/external_sst_file_ingestion_job.cc \
@ -293,6 +294,7 @@ MAIN_SOURCES = \
db/dbformat_test.cc \
db/deletefile_test.cc \
db/env_timed_test.cc \
db/error_handler_test.cc \
db/external_sst_file_basic_test.cc \
db/external_sst_file_test.cc \
db/fault_injection_test.cc \

@ -121,7 +121,7 @@ TestWritableFile::~TestWritableFile() {
Status TestWritableFile::Append(const Slice& data) {
if (!env_->IsFilesystemActive()) {
return Status::Corruption("Not Active");
return env_->GetError();
}
Status s = target_->Append(data);
if (s.ok()) {
@ -172,7 +172,7 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
unique_ptr<WritableFile>* result,
const EnvOptions& soptions) {
if (!IsFilesystemActive()) {
return Status::Corruption("Not Active");
return GetError();
}
// Not allow overwriting files
Status s = target()->FileExists(fname);
@ -199,7 +199,7 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
if (!IsFilesystemActive()) {
return Status::Corruption("Not Active");
return GetError();
}
Status s = EnvWrapper::DeleteFile(f);
if (!s.ok()) {
@ -216,7 +216,7 @@ Status FaultInjectionTestEnv::DeleteFile(const std::string& f) {
Status FaultInjectionTestEnv::RenameFile(const std::string& s,
const std::string& t) {
if (!IsFilesystemActive()) {
return Status::Corruption("Not Active");
return GetError();
}
Status ret = EnvWrapper::RenameFile(s, t);

@ -145,12 +145,20 @@ class FaultInjectionTestEnv : public EnvWrapper {
MutexLock l(&mutex_);
return filesystem_active_;
}
void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; }
void SetFilesystemActive(bool active) {
void SetFilesystemActiveNoLock(bool active,
Status error = Status::Corruption("Not active")) {
filesystem_active_ = active;
if (!active) {
error_ = error;
}
}
void SetFilesystemActive(bool active,
Status error = Status::Corruption("Not active")) {
MutexLock l(&mutex_);
SetFilesystemActiveNoLock(active);
SetFilesystemActiveNoLock(active, error);
}
void AssertNoOpenFile() { assert(open_files_.empty()); }
Status GetError() { return error_; }
private:
port::Mutex mutex_;
@ -159,6 +167,7 @@ class FaultInjectionTestEnv : public EnvWrapper {
std::unordered_map<std::string, std::set<std::string>>
dir_to_new_files_since_last_sync_;
bool filesystem_active_; // Record flushes, syncs, writes
Status error_;
};
} // namespace rocksdb

@ -24,7 +24,7 @@ const char* Status::CopyState(const char* state) {
}
Status::Status(Code _code, SubCode _subcode, const Slice& msg, const Slice& msg2)
: code_(_code), subcode_(_subcode) {
: code_(_code), subcode_(_subcode), sev_(kNoError) {
assert(code_ != kOk);
assert(subcode_ != kMaxSubCode);
const size_t len1 = msg.size();

@ -15,7 +15,8 @@ const char* Status::msgs[] = {
"No space left on device", // kNoSpace
"Deadlock", // kDeadlock
"Stale file handle", // kStaleFile
"Memory limit reached" // kMemoryLimit
"Memory limit reached", // kMemoryLimit
"Space limit reached" // kSpaceLimit
};
} // namespace rocksdb

Loading…
Cancel
Save