Auto recovery from out of space errors (#4164)

Summary:
This commit implements automatic recovery from a Status::NoSpace() error
during background operations such as write callback, flush and
compaction. The broad design is as follows -
1. Compaction errors are treated as soft errors and don't put the
database in read-only mode. A compaction is delayed until enough free
disk space is available to accomodate the compaction outputs, which is
estimated based on the input size. This means that users can continue to
write, and we rely on the WriteController to delay or stop writes if the
compaction debt becomes too high due to persistent low disk space
condition
2. Errors during write callback and flush are treated as hard errors,
i.e the database is put in read-only mode and goes back to read-write
only fater certain recovery actions are taken.
3. Both types of recovery rely on the SstFileManagerImpl to poll for
sufficient disk space. We assume that there is a 1-1 mapping between an
SFM and the underlying OS storage container. For cases where multiple
DBs are hosted on a single storage container, the user is expected to
allocate a single SFM instance and use the same one for all the DBs. If
no SFM is specified by the user, DBImpl::Open() will allocate one, but
this will be one per DB and each DB will recover independently. The
recovery implemented by SFM is as follows -
  a) On the first occurance of an out of space error during compaction,
subsequent
  compactions will be delayed until the disk free space check indicates
  enough available space. The required space is computed as the sum of
  input sizes.
  b) The free space check requirement will be removed once the amount of
  free space is greater than the size reserved by in progress
  compactions when the first error occured
  c) If the out of space error is a hard error, a background thread in
  SFM will poll for sufficient headroom before triggering the recovery
  of the database and putting it in write-only mode. The headroom is
  calculated as the sum of the write_buffer_size of all the DB instances
  associated with the SFM
4. EventListener callbacks will be called at the start and completion of
automatic recovery. Users can disable the auto recov ery in the start
callback, and later initiate it manually by calling DB::Resume()

Todo:
1. More extensive testing
2. Add disk full condition to db_stress (follow-on PR)
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4164

Differential Revision: D9846378

Pulled By: anand1976

fbshipit-source-id: 80ea875dbd7f00205e19c82215ff6e37da10da4a
main
Anand Ananthabhotla 6 years ago committed by Facebook Github Bot
parent 3db584059c
commit a27fce408e
  1. 2
      db/compaction_job_test.cc
  2. 101
      db/db_impl.cc
  3. 19
      db/db_impl.h
  4. 132
      db/db_impl_compaction_flush.cc
  5. 2
      db/db_impl_debug.cc
  6. 23
      db/db_impl_open.cc
  7. 28
      db/db_impl_write.cc
  8. 183
      db/error_handler.cc
  9. 51
      db/error_handler.h
  10. 571
      db/error_handler_test.cc
  11. 29
      db/event_helpers.cc
  12. 5
      db/event_helpers.h
  13. 2
      db/flush_job.cc
  14. 5
      db/listener_test.cc
  15. 13
      env/env_posix.cc
  16. 1
      env/posix_logger.h
  17. 9
      include/rocksdb/env.h
  18. 17
      include/rocksdb/listener.h
  19. 53
      tools/db_bench_tool.cc
  20. 2
      util/delete_scheduler_test.cc
  21. 10
      util/fault_injection_test_env.h
  22. 268
      util/sst_file_manager_impl.cc
  23. 51
      util/sst_file_manager_impl.h

@ -79,7 +79,7 @@ class CompactionJobTest : public testing::Test {
shutting_down_(false),
preserve_deletes_seqnum_(0),
mock_table_factory_(new mock::MockTableFactory()),
error_handler_(db_options_, &mutex_) {
error_handler_(nullptr, db_options_, &mutex_) {
EXPECT_OK(env_->CreateDirIfMissing(dbname_));
db_options_.db_paths.emplace_back(dbname_,
std::numeric_limits<uint64_t>::max());

@ -215,9 +215,10 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
// requires a custom gc for compaction, we use that to set use_custom_gc_
// as well.
use_custom_gc_(seq_per_batch),
shutdown_initiated_(false),
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(immutable_db_options_, &mutex_) {
error_handler_(this, immutable_db_options_, &mutex_) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
assert(batch_per_txn_ || seq_per_batch_);
@ -259,16 +260,62 @@ Status DBImpl::Resume() {
return Status::OK();
}
Status s = error_handler_.GetBGError();
if (s.severity() > Status::Severity::kHardError) {
if (error_handler_.IsRecoveryInProgress()) {
// Don't allow a mix of manual and automatic recovery
return Status::Busy();
}
mutex_.Unlock();
Status s = error_handler_.RecoverFromBGError(true);
mutex_.Lock();
return s;
}
// This function implements the guts of recovery from a background error. It
// is eventually called for both manual as well as automatic recovery. It does
// the following -
// 1. Wait for currently scheduled background flush/compaction to exit, in
// order to inadvertently causing an error and thinking recovery failed
// 2. Flush memtables if there's any data for all the CFs. This may result
// another error, which will be saved by error_handler_ and reported later
// as the recovery status
// 3. Find and delete any obsolete files
// 4. Schedule compactions if needed for all the CFs. This is needed as the
// flush in the prior step might have been a no-op for some CFs, which
// means a new super version wouldn't have been installed
Status DBImpl::ResumeImpl() {
mutex_.AssertHeld();
WaitForBackgroundWork();
Status bg_error = error_handler_.GetBGError();
Status s;
if (shutdown_initiated_) {
// Returning shutdown status to SFM during auto recovery will cause it
// to abort the recovery and allow the shutdown to progress
s = Status::ShutdownInProgress();
}
if (s.ok() && bg_error.severity() > Status::Severity::kHardError) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Fatal/Unrecoverable error");
return s;
s = bg_error;
}
// We cannot guarantee consistency of the WAL. So force flush Memtables of
// all the column families
if (s.ok()) {
s = FlushAllCFs(FlushReason::kErrorRecovery);
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Flush failure [%s]",
s.ToString().c_str());
}
}
JobContext job_context(0);
FindObsoleteFiles(&job_context, true);
error_handler_.ClearBGError();
if (s.ok()) {
s = error_handler_.ClearBGError();
}
mutex_.Unlock();
job_context.manifest_file_number = 1;
@ -277,13 +324,36 @@ Status DBImpl::Resume() {
}
job_context.Clean();
if (s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
}
mutex_.Lock();
// Check for shutdown again before scheduling further compactions,
// since we released and re-acquired the lock above
if (shutdown_initiated_) {
s = Status::ShutdownInProgress();
}
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}
// Wake up any waiters - in this case, it could be the shutdown thread
bg_cv_.SignalAll();
// No need to check BGError again. If something happened, event listener would be
// notified and the operation causing it would have failed
return Status::OK();
return s;
}
void DBImpl::WaitForBackgroundWork() {
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
}
// Will lock the mutex_, will wait for completion if wait is true
@ -313,14 +383,20 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
if (!wait) {
return;
}
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_) {
bg_cv_.Wait();
}
WaitForBackgroundWork();
}
Status DBImpl::CloseHelper() {
// Guarantee that there is no background error recovery in progress before
// continuing with the shutdown
mutex_.Lock();
shutdown_initiated_ = true;
error_handler_.CancelErrorRecovery();
while (error_handler_.IsRecoveryInProgress()) {
bg_cv_.Wait();
}
mutex_.Unlock();
// CancelAllBackgroundWork called with false means we just set the shutdown
// marker. After this we do a variant of the waiting and unschedule work
// (to consider: moving all the waiting into CancelAllBackgroundWork(true))
@ -338,7 +414,8 @@ Status DBImpl::CloseHelper() {
// Wait for background work to finish
while (bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ || bg_purge_scheduled_ ||
pending_purge_obsolete_files_) {
pending_purge_obsolete_files_ ||
error_handler_.IsRecoveryInProgress()) {
TEST_SYNC_POINT("DBImpl::~DBImpl:WaitJob");
bg_cv_.Wait();
}

@ -795,6 +795,7 @@ class DBImpl : public DB {
private:
friend class DB;
friend class ErrorHandler;
friend class InternalStats;
friend class PessimisticTransaction;
friend class TransactionBaseImpl;
@ -845,6 +846,8 @@ class DBImpl : public DB {
bool read_only = false, bool error_if_log_file_exist = false,
bool error_if_data_exists_in_logs = false);
Status ResumeImpl();
void MaybeIgnoreError(Status* s) const;
const Status CreateArchivalDirectory();
@ -1046,9 +1049,10 @@ class DBImpl : public DB {
LogBuffer* log_buffer,
PrepickedCompaction* prepicked_compaction);
Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
LogBuffer* log_buffer);
LogBuffer* log_buffer, FlushReason* reason);
bool EnoughRoomForCompaction(const std::vector<CompactionInputFiles>& inputs,
bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
const std::vector<CompactionInputFiles>& inputs,
bool* sfm_bookkeeping, LogBuffer* log_buffer);
void PrintStatistics();
@ -1082,6 +1086,10 @@ class DBImpl : public DB {
Status CloseHelper();
Status FlushAllCFs(FlushReason flush_reason);
void WaitForBackgroundWork();
// table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_;
@ -1526,6 +1534,13 @@ class DBImpl : public DB {
// flush/compaction and if it is not provided vis SnapshotChecker, we should
// disable gc to be safe.
const bool use_custom_gc_;
// Flag to indicate that the DB instance shutdown has been initiated. This
// different from shutting_down_ atomic in that it is set at the beginning
// of shutdown sequence, specifically in order to prevent any background
// error recovery from going on in parallel. The latter, shutting_down_,
// is set a little later during the shutdown after scheduling memtable
// flushes
bool shutdown_initiated_;
// Clients must periodically call SetPreserveDeletesSequenceNumber()
// to advance this seqnum. Default value is 0 which means ALL deletes are

@ -26,7 +26,7 @@
namespace rocksdb {
bool DBImpl::EnoughRoomForCompaction(
const std::vector<CompactionInputFiles>& inputs,
ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
bool* sfm_reserved_compact_space, LogBuffer* log_buffer) {
// Check if we have enough room to do the compaction
bool enough_room = true;
@ -34,12 +34,17 @@ bool DBImpl::EnoughRoomForCompaction(
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
enough_room = sfm->EnoughRoomForCompaction(inputs);
// Pass the current bg_error_ to SFM so it can decide what checks to
// perform. If this DB instance hasn't seen any error yet, the SFM can be
// optimistic and not do disk space checks
enough_room =
sfm->EnoughRoomForCompaction(cfd, inputs, error_handler_.GetBGError());
if (enough_room) {
*sfm_reserved_compact_space = true;
}
}
#else
(void)cfd;
(void)inputs;
(void)sfm_reserved_compact_space;
#endif // ROCKSDB_LITE
@ -584,7 +589,7 @@ Status DBImpl::CompactFilesImpl(
bool sfm_reserved_compact_space = false;
// First check if we have enough room to do the compaction
bool enough_room = EnoughRoomForCompaction(
input_files, &sfm_reserved_compact_space, log_buffer);
cfd, input_files, &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
// m's vars will get set properly at the end of this function,
@ -914,6 +919,68 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
return s;
}
Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
Status s;
WriteContext context;
WriteThread::Writer w;
mutex_.AssertHeld();
write_thread_.EnterUnbatched(&w, &mutex_);
FlushRequest flush_req;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
cached_recoverable_state_empty_.load()) {
// Nothing to flush
continue;
}
// SwitchMemtable() will release and reacquire mutex during execution
s = SwitchMemtable(cfd, &context);
if (!s.ok()) {
break;
}
cfd->imm()->FlushRequested();
flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
}
// schedule flush
if (s.ok() && !flush_req.empty()) {
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
}
write_thread_.ExitUnbatched(&w);
if (s.ok()) {
for (auto& flush : flush_req) {
auto cfd = flush.first;
auto flush_memtable_id = flush.second;
while (cfd->imm()->NumNotFlushed() > 0 &&
cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) {
if (!error_handler_.GetRecoveryError().ok()) {
break;
}
if (cfd->IsDropped()) {
// FlushJob cannot flush a dropped CF, if we did not break here
// we will loop forever since cfd->imm()->NumNotFlushed() will never
// drop to zero
continue;
}
cfd->Ref();
bg_cv_.Wait();
cfd->Unref();
}
}
}
flush_req.clear();
return s;
}
Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
int output_level, uint32_t output_path_id,
uint32_t max_subcompactions,
@ -1236,6 +1303,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
if (bg_work_paused_ > 0) {
// we paused the background work
return;
} else if (error_handler_.IsBGWorkStopped() &&
!error_handler_.IsRecoveryInProgress()) {
// There has been a hard error and this call is not part of the recovery
// sequence. Bail out here so we don't get into an endless loop of
// scheduling BG work which will again call this function
return;
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
return;
@ -1263,6 +1336,12 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
if (bg_compaction_paused_ > 0) {
// we paused the background compaction
return;
} else if (error_handler_.IsBGWorkStopped()) {
// Compaction is not part of the recovery sequence from a hard error. We
// might get here because recovery might do a flush and install a new
// super version, which will try to schedule pending compactions. Bail
// out here and let the higher level recovery handle compactions
return;
}
if (HasExclusiveManualCompaction()) {
@ -1420,15 +1499,18 @@ void DBImpl::UnscheduleCallback(void* arg) {
}
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer) {
LogBuffer* log_buffer, FlushReason* reason) {
mutex_.AssertHeld();
Status status;
*reason = FlushReason::kOthers;
// If BG work is stopped due to an error, but a recovery is in progress,
// that means this flush is part of the recovery. So allow it to go through
if (!error_handler_.IsBGWorkStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
status = Status::ShutdownInProgress();
}
} else {
} else if (!error_handler_.IsRecoveryInProgress()) {
status = error_handler_.GetBGError();
}
@ -1479,6 +1561,9 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
}
status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
// All the CFDs in the FlushReq must have the same flush reason, so just
// grab the first one
*reason = bg_flush_args[0].cfd_->GetFlushReason();
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
if (cfd->Unref()) {
@ -1505,9 +1590,12 @@ void DBImpl::BackgroundCallFlush() {
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
FlushReason reason;
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) {
Status s =
BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason);
if (!s.ok() && !s.IsShutdownInProgress() &&
reason != FlushReason::kErrorRecovery) {
// Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to
// chew up resources for failed flushes for the duration of
@ -1697,6 +1785,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
} else {
status = error_handler_.GetBGError();
// If we get here, it means a hard error happened after this compaction
// was scheduled by MaybeScheduleFlushOrCompaction(), but before it got
// a chance to execute. Since we didn't pop a cfd from the compaction
// queue, increment unscheduled_compactions_
unscheduled_compactions_++;
}
if (!status.ok()) {
@ -1732,7 +1825,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} else {
// First check if we have enough room to do the compaction
bool enough_room = EnoughRoomForCompaction(
*(c->inputs()), &sfm_reserved_compact_space, log_buffer);
m->cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
// Then don't do the compaction
@ -1795,7 +1888,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (c != nullptr) {
bool enough_room = EnoughRoomForCompaction(
*(c->inputs()), &sfm_reserved_compact_space, log_buffer);
cfd, *(c->inputs()), &sfm_reserved_compact_space, log_buffer);
if (!enough_room) {
// Then don't do the compaction
@ -2004,8 +2097,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
NotifyOnCompactionCompleted(c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
}
// this will unref its input_version and column_family_data
c.reset();
if (status.ok() || status.IsCompactionTooLarge()) {
// Done
@ -2015,7 +2106,26 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
// Put this cfd back in the compaction queue so we can retry after some
// time
auto cfd = c->column_family_data();
assert(cfd != nullptr);
// Since this compaction failed, we need to recompute the score so it
// takes the original input files into account
c->column_family_data()
->current()
->storage_info()
->ComputeCompactionScore(*(c->immutable_cf_options()),
*(c->mutable_cf_options()));
if (!cfd->queued_for_compaction()) {
AddToCompactionQueue(cfd);
++unscheduled_compactions_;
}
}
}
// this will unref its input_version and column_family_data
c.reset();
if (is_manual) {
ManualCompactionState* m = manual_compaction;

@ -137,7 +137,7 @@ Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {
while ((bg_bottom_compaction_scheduled_ || bg_compaction_scheduled_ ||
bg_flush_scheduled_ ||
(wait_unscheduled && unscheduled_compactions_)) &&
!error_handler_.IsDBStopped()) {
(error_handler_.GetBGError() == Status::OK())) {
bg_cv_.Wait();
}
return error_handler_.GetBGError();

@ -134,8 +134,15 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
for (size_t i = 0; i < result.db_paths.size(); i++) {
DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
}
#endif
// Create a default SstFileManager for purposes of tracking compaction size
// and facilitating recovery from out of space errors.
if (result.sst_file_manager.get() == nullptr) {
std::shared_ptr<SstFileManager> sst_file_manager(
NewSstFileManager(result.env, result.info_log));
result.sst_file_manager = sst_file_manager;
}
#endif
return result;
}
@ -1050,6 +1057,12 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
break;
}
}
// For recovery from NoSpace() error, we can only handle
// the case where the database is stored in a single path
if (paths.size() <= 1) {
impl->error_handler_.EnableAutoRecovery();
}
}
if (!s.ok()) {
@ -1213,6 +1226,14 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
}
}
}
// Reserve some disk buffer space. This is a heuristic - when we run out
// of disk space, this ensures that there is atleast write_buffer_size
// amount of free space before we resume DB writes. In low disk space
// conditions, we want to avoid a lot of small L0 files due to frequent
// WAL write failures and resultant forced flushes
sfm->ReserveDiskBuffer(max_write_buffer_size,
impl->immutable_db_options_.db_paths[0].path);
}
#endif // !ROCKSDB_LITE

@ -710,6 +710,10 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
assert(write_context != nullptr && need_log_sync != nullptr);
Status status;
if (error_handler_.IsDBStopped()) {
status = error_handler_.GetBGError();
}
PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time);
assert(!single_column_family_mode_ ||
@ -728,10 +732,6 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
status = HandleWriteBufferFull(write_context);
}
if (UNLIKELY(status.ok())) {
status = error_handler_.GetBGError();
}
if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) {
status = ScheduleFlushes(write_context);
}
@ -1184,7 +1184,11 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
mutex_.Lock();
}
while (!error_handler_.IsDBStopped() && write_controller_.IsStopped()) {
// Don't wait if there's a background error, even if its a soft error. We
// might wait here indefinitely as the background compaction may never
// finish successfully, resulting in the stall condition lasting
// indefinitely
while (error_handler_.GetBGError().ok() && write_controller_.IsStopped()) {
if (write_options.no_slowdown) {
return Status::Incomplete();
}
@ -1200,7 +1204,19 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
RecordTick(stats_, STALL_MICROS, time_delayed);
}
return error_handler_.GetBGError();
// If DB is not in read-only mode and write_controller is not stopping
// writes, we can ignore any background errors and allow the write to
// proceed
Status s;
if (write_controller_.IsStopped()) {
// If writes are still stopped, it means we bailed due to a background
// error
s = Status::Incomplete(error_handler_.GetBGError().ToString());
}
if (error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
return s;
}
Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,

@ -4,7 +4,9 @@
// (found in the LICENSE.Apache file in the root directory).
//
#include "db/error_handler.h"
#include "db/db_impl.h"
#include "db/event_helpers.h"
#include "util/sst_file_manager_impl.h"
namespace rocksdb {
@ -33,7 +35,7 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
// Errors during BG flush
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kNoSpace, true),
Status::Severity::kSoftError},
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::SubCode::kNoSpace, false),
Status::Severity::kNoError},
@ -44,11 +46,11 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kNoSpace,
true),
Status::Severity::kFatalError},
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kFatalError},
Status::Severity::kHardError},
};
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity>
@ -118,6 +120,45 @@ std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
Status::Severity::kFatalError},
};
void ErrorHandler::CancelErrorRecovery() {
#ifndef ROCKSDB_LITE
db_mutex_->AssertHeld();
// We'll release the lock before calling sfm, so make sure no new
// recovery gets scheduled at that point
auto_recovery_ = false;
SstFileManagerImpl* sfm = reinterpret_cast<SstFileManagerImpl*>(
db_options_.sst_file_manager.get());
if (sfm) {
// This may or may not cancel a pending recovery
db_mutex_->Unlock();
bool cancelled = sfm->CancelErrorRecovery(this);
db_mutex_->Lock();
if (cancelled) {
recovery_in_prog_ = false;
}
}
#endif
}
// This is the main function for looking at an error during a background
// operation and deciding the severity, and error recovery strategy. The high
// level algorithm is as follows -
// 1. Classify the severity of the error based on the ErrorSeverityMap,
// DefaultErrorSeverityMap and DefaultReasonMap defined earlier
// 2. Call a Status code specific override function to adjust the severity
// if needed. The reason for this is our ability to recover may depend on
// the exact options enabled in DBOptions
// 3. Determine if auto recovery is possible. A listener notification callback
// is called, which can disable the auto recovery even if we decide its
// feasible
// 4. For Status::NoSpace() errors, rely on SstFileManagerImpl to control
// the actual recovery. If no sst file manager is specified in DBOptions,
// a default one is allocated during DB::Open(), so there will always be
// one.
// This can also get called as part of a recovery operation. In that case, we
// also track the error seperately in recovery_error_ so we can tell in the
// end whether recovery succeeded or not
Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reason) {
db_mutex_->AssertHeld();
@ -125,6 +166,12 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
return Status::OK();
}
// Check if recovery is currently in progress. If it is, we will save this
// error so we can check it at the end to see if recovery succeeded or not
if (recovery_in_prog_ && recovery_error_.ok()) {
recovery_error_ = bg_err;
}
bool paranoid = db_options_.paranoid_checks;
Status::Severity sev = Status::Severity::kFatalError;
Status new_bg_err;
@ -156,15 +203,143 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
}
new_bg_err = Status(bg_err, sev);
bool auto_recovery = auto_recovery_;
if (new_bg_err.severity() >= Status::Severity::kFatalError && auto_recovery) {
auto_recovery = false;
;
}
// Allow some error specific overrides
if (new_bg_err == Status::NoSpace()) {
new_bg_err = OverrideNoSpaceError(new_bg_err, &auto_recovery);
}
if (!new_bg_err.ok()) {
Status s = new_bg_err;
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, db_mutex_);
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s,
db_mutex_, &auto_recovery);
if (!s.ok() && (s.severity() > bg_error_.severity())) {
bg_error_ = s;
} else {
// This error is less severe than previously encountered error. Don't
// take any further action
return bg_error_;
}
}
if (auto_recovery) {
recovery_in_prog_ = true;
// Kick-off error specific recovery
if (bg_error_ == Status::NoSpace()) {
RecoverFromNoSpace();
}
}
return bg_error_;
}
Status ErrorHandler::OverrideNoSpaceError(Status bg_error,
bool* auto_recovery) {
#ifndef ROCKSDB_LITE
if (bg_error.severity() >= Status::Severity::kFatalError) {
return bg_error;
}
if (db_options_.sst_file_manager.get() == nullptr) {
// We rely on SFM to poll for enough disk space and recover
*auto_recovery = false;
return bg_error;
}
if (db_options_.allow_2pc &&
(bg_error.severity() <= Status::Severity::kSoftError)) {
// Don't know how to recover, as the contents of the current WAL file may
// be inconsistent, and it may be needed for 2PC. If 2PC is not enabled,
// we can just flush the memtable and discard the log
*auto_recovery = false;
return Status(bg_error, Status::Severity::kFatalError);
}
{
uint64_t free_space;
if (db_options_.env->GetFreeSpace(db_options_.db_paths[0].path,
&free_space) == Status::NotSupported()) {
*auto_recovery = false;
}
}
return bg_error;
#else
(void)auto_recovery;
return Status(bg_error, Status::Severity::kFatalError);
#endif
}
void ErrorHandler::RecoverFromNoSpace() {
#ifndef ROCKSDB_LITE
SstFileManagerImpl* sfm =
reinterpret_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
// Inform SFM of the error, so it can kick-off the recovery
if (sfm) {
sfm->StartErrorRecovery(this, bg_error_);
}
#endif
}
Status ErrorHandler::ClearBGError() {
#ifndef ROCKSDB_LITE
db_mutex_->AssertHeld();
// Signal that recovery succeeded
if (recovery_error_.ok()) {
Status old_bg_error = bg_error_;
bg_error_ = Status::OK();
recovery_in_prog_ = false;
EventHelpers::NotifyOnErrorRecoveryCompleted(db_options_.listeners,
old_bg_error, db_mutex_);
}
return recovery_error_;
#else
return bg_error_;
#endif
}
Status ErrorHandler::RecoverFromBGError(bool is_manual) {
#ifndef ROCKSDB_LITE
InstrumentedMutexLock l(db_mutex_);
if (is_manual) {
// If its a manual recovery and there's a background recovery in progress
// return busy status
if (recovery_in_prog_) {
return Status::Busy();
}
recovery_in_prog_ = true;
}
if (bg_error_.severity() == Status::Severity::kSoftError) {
// Simply clear the background error and return
recovery_error_ = Status::OK();
return ClearBGError();
}
// Reset recovery_error_. We will use this to record any errors that happen
// during the recovery process. While recovering, the only operations that
// can generate background errors should be the flush operations
recovery_error_ = Status::OK();
Status s = db_->ResumeImpl();
// For manual recover, shutdown, and fatal error cases, set
// recovery_in_prog_ to false. For automatic background recovery, leave it
// as is regardless of success or failure as it will be retried
if (is_manual || s.IsShutdownInProgress() ||
bg_error_.severity() >= Status::Severity::kFatalError) {
recovery_in_prog_ = false;
}
return s;
#else
(void)is_manual;
return bg_error_;
#endif
}
}

@ -11,42 +11,65 @@
namespace rocksdb {
class DBImpl;
class ErrorHandler {
public:
ErrorHandler(const ImmutableDBOptions& db_options,
ErrorHandler(DBImpl* db, const ImmutableDBOptions& db_options,
InstrumentedMutex* db_mutex)
: db_options_(db_options),
: db_(db),
db_options_(db_options),
bg_error_(Status::OK()),
db_mutex_(db_mutex)
{}
recovery_error_(Status::OK()),
db_mutex_(db_mutex),
auto_recovery_(false),
recovery_in_prog_(false) {}
~ErrorHandler() {}
void EnableAutoRecovery() { auto_recovery_ = true; }
Status::Severity GetErrorSeverity(BackgroundErrorReason reason,
Status::Code code, Status::SubCode subcode);
Status::Code code,
Status::SubCode subcode);
Status SetBGError(const Status& bg_err, BackgroundErrorReason reason);
Status GetBGError()
{
return bg_error_;
}
Status GetBGError() { return bg_error_; }
void ClearBGError() {
bg_error_ = Status::OK();
}
Status GetRecoveryError() { return recovery_error_; }
Status ClearBGError();
bool IsDBStopped() {
return !bg_error_.ok();
return !bg_error_.ok() &&
bg_error_.severity() >= Status::Severity::kHardError;
}
bool IsBGWorkStopped() {
return !bg_error_.ok();
return !bg_error_.ok() &&
(bg_error_.severity() >= Status::Severity::kHardError ||
!auto_recovery_);
}
bool IsRecoveryInProgress() { return recovery_in_prog_; }
Status RecoverFromBGError(bool is_manual = false);
void CancelErrorRecovery();
private:
DBImpl* db_;
const ImmutableDBOptions& db_options_;
Status bg_error_;
// A seperate Status variable used to record any errors during the
// recovery process from hard errors
Status recovery_error_;
InstrumentedMutex* db_mutex_;
// A flag indicating whether automatic recovery from errors is enabled
bool auto_recovery_;
bool recovery_in_prog_;
Status OverrideNoSpaceError(Status bg_error, bool* auto_recovery);
void RecoverFromNoSpace();
};
}

@ -6,9 +6,12 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/sst_file_manager.h"
#include "util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
#include "util/sync_point.h"
@ -33,36 +36,137 @@ class DBErrorHandlingEnv : public EnvWrapper {
bool trig_io_error;
};
class ErrorHandlerListener : public EventListener {
public:
ErrorHandlerListener()
: mutex_(),
cv_(&mutex_),
no_auto_recovery_(false),
recovery_complete_(false),
file_creation_started_(false),
override_bg_error_(false),
file_count_(0),
fault_env_(nullptr) {}
void OnTableFileCreationStarted(const TableFileCreationBriefInfo& /*ti*/) {
InstrumentedMutexLock l(&mutex_);
file_creation_started_ = true;
if (file_count_ > 0) {
if (--file_count_ == 0) {
fault_env_->SetFilesystemActive(false, file_creation_error_);
file_creation_error_ = Status::OK();
}
}
cv_.SignalAll();
}
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
Status /*bg_error*/, bool* auto_recovery) {
if (*auto_recovery && no_auto_recovery_) {
*auto_recovery = false;
}
}
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
InstrumentedMutexLock l(&mutex_);
recovery_complete_ = true;
cv_.SignalAll();
}
bool WaitForRecovery(uint64_t /*abs_time_us*/) {
InstrumentedMutexLock l(&mutex_);
while (!recovery_complete_) {
cv_.Wait(/*abs_time_us*/);
}
if (recovery_complete_) {
recovery_complete_ = false;
return true;
}
return false;
}
void WaitForTableFileCreationStarted(uint64_t /*abs_time_us*/) {
InstrumentedMutexLock l(&mutex_);
while (!file_creation_started_) {
cv_.Wait(/*abs_time_us*/);
}
file_creation_started_ = false;
}
void OnBackgroundError(BackgroundErrorReason /*reason*/,
Status* bg_error) override {
if (override_bg_error_) {
*bg_error = bg_error_;
override_bg_error_ = false;
}
}
void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
void OverrideBGError(Status bg_err) {
bg_error_ = bg_err;
override_bg_error_ = true;
}
void InjectFileCreationError(FaultInjectionTestEnv* env, int file_count,
Status s) {
fault_env_ = env;
file_count_ = file_count;
file_creation_error_ = s;
}
private:
InstrumentedMutex mutex_;
InstrumentedCondVar cv_;
bool no_auto_recovery_;
bool recovery_complete_;
bool file_creation_started_;
bool override_bg_error_;
int file_count_;
Status file_creation_error_;
Status bg_error_;
FaultInjectionTestEnv* fault_env_;
};
TEST_F(DBErrorHandlingTest, FLushWriteError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
listener->EnableAutoRecovery(false);
DestroyAndReopen(options);
Put(Key(0), "va;");
Put(Key(0), "val");
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::Start", [&](void *) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
});
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError);
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_env->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Reopen(options);
ASSERT_EQ("val", Get(Key(0)));
Destroy(options);
}
TEST_F(DBErrorHandlingTest, CompactionWriteError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(listener);
options.env = fault_env.get();
Status s;
DestroyAndReopen(options);
@ -72,6 +176,10 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) {
s = Flush();
ASSERT_EQ(s, Status::OK());
listener->OverrideBGError(
Status(Status::NoSpace(), Status::Severity::kHardError)
);
listener->EnableAutoRecovery(false);
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"FlushMemTableFinished", "BackgroundCallCompaction:0"}});
rocksdb::SyncPoint::GetInstance()->SetCallBack(
@ -85,7 +193,7 @@ TEST_F(DBErrorHandlingTest, CompactionWriteError) {
ASSERT_EQ(s, Status::OK());
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kSoftError);
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
fault_env->SetFilesystemActive(true);
s = dbfull()->Resume();
@ -129,6 +237,453 @@ TEST_F(DBErrorHandlingTest, CorruptionError) {
Destroy(options);
}
#ifndef TRAVIS
TEST_F(DBErrorHandlingTest, AutoRecoverFlushError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
listener->EnableAutoRecovery();
DestroyAndReopen(options);
Put(Key(0), "val");
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
});
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_env->SetFilesystemActive(true);
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
s = Put(Key(1), "val");
ASSERT_EQ(s, Status::OK());
Reopen(options);
ASSERT_EQ("val", Get(Key(0)));
ASSERT_EQ("val", Get(Key(1)));
Destroy(options);
}
TEST_F(DBErrorHandlingTest, FailRecoverFlushError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
listener->EnableAutoRecovery();
DestroyAndReopen(options);
Put(Key(0), "val");
SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
});
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), rocksdb::Status::Severity::kHardError);
// We should be able to shutdown the database while auto recovery is going
// on in the background
Close();
DestroyDB(dbname_, options);
}
TEST_F(DBErrorHandlingTest, WALWriteError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.writable_file_max_buffer_size = 32768;
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
listener->EnableAutoRecovery();
DestroyAndReopen(options);
{
WriteBatch batch;
char val[1024];
for (auto i = 0; i<100; ++i) {
sprintf(val, "%d", i);
batch.Put(Key(i), Slice(val, sizeof(val)));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
};
{
WriteBatch batch;
char val[1024];
int write_error = 0;
for (auto i = 100; i<199; ++i) {
sprintf(val, "%d", i);
batch.Put(Key(i), Slice(val, sizeof(val)));
}
SyncPoint::GetInstance()->SetCallBack("WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
write_error++;
if (write_error > 2) {
fault_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
}
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wopts;
wopts.sync = true;
s = dbfull()->Write(wopts, &batch);
ASSERT_EQ(s, s.NoSpace());
}
SyncPoint::GetInstance()->DisableProcessing();
fault_env->SetFilesystemActive(true);
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
for (auto i=0; i<199; ++i) {
if (i < 100) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
} else {
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
}
}
Reopen(options);
for (auto i=0; i<199; ++i) {
if (i < 100) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
} else {
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
}
}
Close();
}
TEST_F(DBErrorHandlingTest, MultiCFWALWriteError) {
std::unique_ptr<FaultInjectionTestEnv> fault_env(
new FaultInjectionTestEnv(Env::Default()));
std::shared_ptr<ErrorHandlerListener> listener(new ErrorHandlerListener());
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.writable_file_max_buffer_size = 32768;
options.env = fault_env.get();
options.listeners.emplace_back(listener);
Status s;
listener->EnableAutoRecovery();
CreateAndReopenWithCF({"one", "two", "three"}, options);
{
WriteBatch batch;
char val[1024];
for (auto i = 1; i < 4; ++i) {
for (auto j = 0; j < 100; ++j) {
sprintf(val, "%d", j);
batch.Put(handles_[i], Key(j), Slice(val, sizeof(val)));
}
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
};
{
WriteBatch batch;
char val[1024];
int write_error = 0;
// Write to one CF
for (auto i = 100; i < 199; ++i) {
sprintf(val, "%d", i);
batch.Put(handles_[2], Key(i), Slice(val, sizeof(val)));
}
SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
write_error++;
if (write_error > 2) {
fault_env->SetFilesystemActive(false,
Status::NoSpace("Out of space"));
}
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wopts;
wopts.sync = true;
s = dbfull()->Write(wopts, &batch);
ASSERT_EQ(s, s.NoSpace());
}
SyncPoint::GetInstance()->DisableProcessing();
fault_env->SetFilesystemActive(true);
ASSERT_EQ(listener->WaitForRecovery(5000000), true);
for (auto i = 1; i < 4; ++i) {
// Every CF should have been flushed
ASSERT_EQ(NumTableFilesAtLevel(0, i), 1);
}
for (auto i = 1; i < 4; ++i) {
for (auto j = 0; j < 199; ++j) {
if (j < 100) {
ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
} else {
ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
}
}
}
ReopenWithColumnFamilies({"default", "one", "two", "three"}, options);
for (auto i = 1; i < 4; ++i) {
for (auto j = 0; j < 199; ++j) {
if (j < 100) {
ASSERT_NE(Get(i, Key(j)), "NOT_FOUND");
} else {
ASSERT_EQ(Get(i, Key(j)), "NOT_FOUND");
}
}
}
Close();
}
TEST_F(DBErrorHandlingTest, MultiDBCompactionError) {
FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;
std::vector<Options> options;
std::vector<std::shared_ptr<ErrorHandlerListener>> listener;
std::vector<DB*> db;
std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
int kNumDbInstances = 3;
for (auto i = 0; i < kNumDbInstances; ++i) {
listener.emplace_back(new ErrorHandlerListener());
options.emplace_back(GetDefaultOptions());
fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));
options[i].create_if_missing = true;
options[i].level0_file_num_compaction_trigger = 2;
options[i].writable_file_max_buffer_size = 32768;
options[i].env = fault_env[i].get();
options[i].listeners.emplace_back(listener[i]);
options[i].sst_file_manager = sfm;
DB* dbptr;
char buf[16];
listener[i]->EnableAutoRecovery();
// Setup for returning error for the 3rd SST, which would be level 1
listener[i]->InjectFileCreationError(fault_env[i].get(), 3,
Status::NoSpace("Out of space"));
snprintf(buf, sizeof(buf), "_%d", i);
DestroyDB(dbname_ + std::string(buf), options[i]);
ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
Status::OK());
db.emplace_back(dbptr);
}
for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];
for (auto j = 0; j <= 100; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
}
def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];
// Write to one CF
for (auto j = 100; j < 199; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
}
for (auto i = 0; i < kNumDbInstances; ++i) {
Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
fault_env[i]->SetFilesystemActive(true);
}
def_env->SetFilesystemActive(true);
for (auto i = 0; i < kNumDbInstances; ++i) {
std::string prop;
ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
EXPECT_TRUE(db[i]->GetProperty(
"rocksdb.num-files-at-level" + NumberToString(0), &prop));
EXPECT_EQ(atoi(prop.c_str()), 0);
EXPECT_TRUE(db[i]->GetProperty(
"rocksdb.num-files-at-level" + NumberToString(1), &prop));
EXPECT_EQ(atoi(prop.c_str()), 1);
}
for (auto i = 0; i < kNumDbInstances; ++i) {
char buf[16];
snprintf(buf, sizeof(buf), "_%d", i);
delete db[i];
fault_env[i]->SetFilesystemActive(true);
if (getenv("KEEP_DB")) {
printf("DB is still at %s%s\n", dbname_.c_str(), buf);
} else {
Status s = DestroyDB(dbname_ + std::string(buf), options[i]);
}
}
options.clear();
sfm.reset();
delete def_env;
}
TEST_F(DBErrorHandlingTest, MultiDBVariousErrors) {
FaultInjectionTestEnv* def_env = new FaultInjectionTestEnv(Env::Default());
std::vector<std::unique_ptr<FaultInjectionTestEnv>> fault_env;
std::vector<Options> options;
std::vector<std::shared_ptr<ErrorHandlerListener>> listener;
std::vector<DB*> db;
std::shared_ptr<SstFileManager> sfm(NewSstFileManager(def_env));
int kNumDbInstances = 3;
for (auto i = 0; i < kNumDbInstances; ++i) {
listener.emplace_back(new ErrorHandlerListener());
options.emplace_back(GetDefaultOptions());
fault_env.emplace_back(new FaultInjectionTestEnv(Env::Default()));
options[i].create_if_missing = true;
options[i].level0_file_num_compaction_trigger = 2;
options[i].writable_file_max_buffer_size = 32768;
options[i].env = fault_env[i].get();
options[i].listeners.emplace_back(listener[i]);
options[i].sst_file_manager = sfm;
DB* dbptr;
char buf[16];
listener[i]->EnableAutoRecovery();
switch (i) {
case 0:
// Setup for returning error for the 3rd SST, which would be level 1
listener[i]->InjectFileCreationError(fault_env[i].get(), 3,
Status::NoSpace("Out of space"));
break;
case 1:
// Setup for returning error after the 1st SST, which would result
// in a hard error
listener[i]->InjectFileCreationError(fault_env[i].get(), 2,
Status::NoSpace("Out of space"));
break;
default:
break;
}
snprintf(buf, sizeof(buf), "_%d", i);
DestroyDB(dbname_ + std::string(buf), options[i]);
ASSERT_EQ(DB::Open(options[i], dbname_ + std::string(buf), &dbptr),
Status::OK());
db.emplace_back(dbptr);
}
for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];
for (auto j = 0; j <= 100; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
}
def_env->SetFilesystemActive(false, Status::NoSpace("Out of space"));
for (auto i = 0; i < kNumDbInstances; ++i) {
WriteBatch batch;
char val[1024];
// Write to one CF
for (auto j = 100; j < 199; ++j) {
sprintf(val, "%d", j);
batch.Put(Key(j), Slice(val, sizeof(val)));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(db[i]->Write(wopts, &batch), Status::OK());
if (i != 1) {
ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::OK());
} else {
ASSERT_EQ(db[i]->Flush(FlushOptions()), Status::NoSpace());
}
}
for (auto i = 0; i < kNumDbInstances; ++i) {
Status s = static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true);
switch (i) {
case 0:
ASSERT_EQ(s.severity(), Status::Severity::kSoftError);
break;
case 1:
ASSERT_EQ(s.severity(), Status::Severity::kHardError);
break;
case 2:
ASSERT_EQ(s, Status::OK());
break;
}
fault_env[i]->SetFilesystemActive(true);
}
def_env->SetFilesystemActive(true);
for (auto i = 0; i < kNumDbInstances; ++i) {
std::string prop;
if (i < 2) {
ASSERT_EQ(listener[i]->WaitForRecovery(5000000), true);
}
if (i == 1) {
ASSERT_EQ(static_cast<DBImpl*>(db[i])->TEST_WaitForCompact(true),
Status::OK());
}
EXPECT_TRUE(db[i]->GetProperty(
"rocksdb.num-files-at-level" + NumberToString(0), &prop));
EXPECT_EQ(atoi(prop.c_str()), 0);
EXPECT_TRUE(db[i]->GetProperty(
"rocksdb.num-files-at-level" + NumberToString(1), &prop));
EXPECT_EQ(atoi(prop.c_str()), 1);
}
for (auto i = 0; i < kNumDbInstances; ++i) {
char buf[16];
snprintf(buf, sizeof(buf), "_%d", i);
fault_env[i]->SetFilesystemActive(true);
delete db[i];
if (getenv("KEEP_DB")) {
printf("DB is still at %s%s\n", dbname_.c_str(), buf);
} else {
DestroyDB(dbname_ + std::string(buf), options[i]);
}
}
options.clear();
delete def_env;
}
#endif
} // namespace rocksdb
int main(int argc, char** argv) {
@ -136,3 +691,13 @@ int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int /*argc*/, char** /*argv*/) {
fprintf(stderr, "SKIPPED as Cuckoo table is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // ROCKSDB_LITE

@ -40,8 +40,8 @@ void EventHelpers::NotifyTableFileCreationStarted(
void EventHelpers::NotifyOnBackgroundError(
const std::vector<std::shared_ptr<EventListener>>& listeners,
BackgroundErrorReason reason, Status* bg_error,
InstrumentedMutex* db_mutex) {
BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
bool* auto_recovery) {
#ifndef ROCKSDB_LITE
if (listeners.size() == 0U) {
return;
@ -51,6 +51,9 @@ void EventHelpers::NotifyOnBackgroundError(
db_mutex->Unlock();
for (auto& listener : listeners) {
listener->OnBackgroundError(reason, bg_error);
if (*auto_recovery) {
listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
}
}
db_mutex->Lock();
#else
@ -58,6 +61,7 @@ void EventHelpers::NotifyOnBackgroundError(
(void)reason;
(void)bg_error;
(void)db_mutex;
(void)auto_recovery;
#endif // ROCKSDB_LITE
}
@ -167,4 +171,25 @@ void EventHelpers::LogAndNotifyTableFileDeletion(
#endif // !ROCKSDB_LITE
}
void EventHelpers::NotifyOnErrorRecoveryCompleted(
const std::vector<std::shared_ptr<EventListener>>& listeners,
Status old_bg_error, InstrumentedMutex* db_mutex) {
#ifndef ROCKSDB_LITE
if (listeners.size() == 0U) {
return;
}
db_mutex->AssertHeld();
// release lock while notifying events
db_mutex->Unlock();
for (auto& listener : listeners) {
listener->OnErrorRecoveryCompleted(old_bg_error);
}
db_mutex->Lock();
#else
(void)listeners;
(void)old_bg_error;
(void)db_mutex;
#endif // ROCKSDB_LITE
}
} // namespace rocksdb

@ -28,7 +28,7 @@ class EventHelpers {
static void NotifyOnBackgroundError(
const std::vector<std::shared_ptr<EventListener>>& listeners,
BackgroundErrorReason reason, Status* bg_error,
InstrumentedMutex* db_mutex);
InstrumentedMutex* db_mutex, bool* auto_recovery);
static void LogAndNotifyTableFileCreationFinished(
EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners,
@ -41,6 +41,9 @@ class EventHelpers {
uint64_t file_number, const std::string& file_path,
const Status& status, const std::string& db_name,
const std::vector<std::shared_ptr<EventListener>>& listeners);
static void NotifyOnErrorRecoveryCompleted(
const std::vector<std::shared_ptr<EventListener>>& listeners,
Status bg_error, InstrumentedMutex* db_mutex);
private:
static void LogAndNotifyTableFileCreation(

@ -78,6 +78,8 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
return "Auto Compaction";
case FlushReason::kManualFlush:
return "Manual Flush";
case FlushReason::kErrorRecovery:
return "Error Recovery";
default:
return "Invalid";
}

@ -882,10 +882,13 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
ASSERT_EQ(1, listener->counter());
// trigger flush so compaction is triggered again; this time it succeeds
// The previous failed compaction may get retried automatically, so we may
// be left with 0 or 1 files in level 1, depending on when the retry gets
// scheduled
ASSERT_OK(Put("key0", "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_LE(1, NumTableFilesAtLevel(0));
}
} // namespace rocksdb

13
env/env_posix.cc vendored

@ -25,6 +25,7 @@
#include <sys/syscall.h>
#include <sys/sysmacros.h>
#endif
#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
@ -776,6 +777,18 @@ class PosixEnv : public Env {
return gettid(pthread_self());
}
virtual Status GetFreeSpace(const std::string& fname,
uint64_t* free_space) override {
struct statvfs sbuf;
if (statvfs(fname.c_str(), &sbuf) < 0) {
return IOError("While doing statvfs", fname, errno);
}
*free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
return Status::OK();
}
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) override {
FILE* f;

@ -165,7 +165,6 @@ class PosixLogger : public Logger {
size_t sz = fwrite(base, 1, write_size, file_);
flush_pending_ = true;
assert(sz == write_size);
if (sz > 0) {
log_size_ += write_size;
}

@ -477,6 +477,15 @@ class Env {
// Returns the ID of the current thread.
virtual uint64_t GetThreadID() const;
// This seems to clash with a macro on Windows, so #undef it here
#undef GetFreeSpace
// Get the amount of free disk space
virtual Status GetFreeSpace(const std::string& /*path*/,
uint64_t* /*diskfree*/) {
return Status::NotSupported();
}
protected:
// The pointer to an internal structure that will update the
// status of each thread.

@ -27,6 +27,7 @@ enum class TableFileCreationReason {
kFlush,
kCompaction,
kRecovery,
kMisc,
};
struct TableFileCreationBriefInfo {
@ -103,6 +104,7 @@ enum class FlushReason : int {
kDeleteFiles = 0x08,
kAutoCompaction = 0x09,
kManualFlush = 0x0a,
kErrorRecovery = 0xb,
};
enum class BackgroundErrorReason {
@ -393,6 +395,21 @@ class EventListener {
// returns. Otherwise, RocksDB may be blocked.
virtual void OnStallConditionsChanged(const WriteStallInfo& /*info*/) {}
// A callback function for RocksDB which will be called just before
// starting the automatic recovery process for recoverable background
// errors, such as NoSpace(). The callback can suppress the automatic
// recovery by setting *auto_recovery to false. The database will then
// have to be transitioned out of read-only mode by calling DB::Resume()
virtual void OnErrorRecoveryBegin(BackgroundErrorReason /* reason */,
Status /* bg_error */,
bool* /* auto_recovery */) {}
// A callback function for RocksDB which will be called once the database
// is recovered from read-only mode after an error. When this is called, it
// means normal writes to the database can be issued and the user can
// initiate any further recovery actions needed
virtual void OnErrorRecoveryCompleted(Status /* old_bg_error */) {}
virtual ~EventListener() {}
};

@ -1983,6 +1983,52 @@ class Benchmark {
bool report_file_operations_;
bool use_blob_db_;
class ErrorHandlerListener : public EventListener {
public:
ErrorHandlerListener()
: mutex_(),
cv_(&mutex_),
no_auto_recovery_(false),
recovery_complete_(false) {}
~ErrorHandlerListener() {}
void OnErrorRecoveryBegin(BackgroundErrorReason /*reason*/,
Status /*bg_error*/, bool* auto_recovery) {
if (*auto_recovery && no_auto_recovery_) {
*auto_recovery = false;
}
}
void OnErrorRecoveryCompleted(Status /*old_bg_error*/) {
InstrumentedMutexLock l(&mutex_);
recovery_complete_ = true;
cv_.SignalAll();
}
bool WaitForRecovery(uint64_t /*abs_time_us*/) {
InstrumentedMutexLock l(&mutex_);
if (!recovery_complete_) {
cv_.Wait(/*abs_time_us*/);
}
if (recovery_complete_) {
recovery_complete_ = false;
return true;
}
return false;
}
void EnableAutoRecovery(bool enable = true) { no_auto_recovery_ = !enable; }
private:
InstrumentedMutex mutex_;
InstrumentedCondVar cv_;
bool no_auto_recovery_;
bool recovery_complete_;
};
std::shared_ptr<ErrorHandlerListener> listener_;
bool SanityCheck() {
if (FLAGS_compression_ratio > 1) {
fprintf(stderr, "compression_ratio should be between 0 and 1\n");
@ -2318,6 +2364,8 @@ class Benchmark {
}
}
}
listener_.reset(new ErrorHandlerListener());
}
~Benchmark() {
@ -3500,6 +3548,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
FLAGS_rate_limiter_auto_tuned));
}
options.listeners.emplace_back(listener_);
if (FLAGS_num_multi_db <= 1) {
OpenDb(options, FLAGS_db, &db_);
} else {
@ -3892,6 +3941,10 @@ void VerifyDBFromDB(std::string& truth_db_name) {
NewGenericRateLimiter(write_rate));
}
}
if (!s.ok()) {
s = listener_->WaitForRecovery(600000000) ? Status::OK() : s;
}
if (!s.ok()) {
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
exit(1);

@ -89,7 +89,7 @@ class DeleteSchedulerTest : public testing::Test {
std::string data(size, 'A');
EXPECT_OK(f->Append(data));
EXPECT_OK(f->Close());
sst_file_mgr_->OnAddFile(file_path);
sst_file_mgr_->OnAddFile(file_path, false);
return file_path;
}

@ -115,6 +115,16 @@ class FaultInjectionTestEnv : public EnvWrapper {
virtual Status RenameFile(const std::string& s,
const std::string& t) override;
virtual Status GetFreeSpace(const std::string& path,
uint64_t* disk_free) override {
if (!IsFilesystemActive() && error_ == Status::NoSpace()) {
*disk_free = 0;
return Status::OK();
} else {
return target()->GetFreeSpace(path, disk_free);
}
}
void WritableFileClosed(const FileState& state);
// For every file that is not fully synced, make a call to `func` with

@ -7,6 +7,7 @@
#include <vector>
#include "db/db_impl.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/sst_file_manager.h"
@ -23,20 +24,38 @@ SstFileManagerImpl::SstFileManagerImpl(Env* env, std::shared_ptr<Logger> logger,
: env_(env),
logger_(logger),
total_files_size_(0),
in_progress_files_size_(0),
compaction_buffer_size_(0),
cur_compactions_reserved_size_(0),
max_allowed_space_(0),
delete_scheduler_(env, rate_bytes_per_sec, logger.get(), this,
max_trash_db_ratio, bytes_max_delete_chunk) {}
max_trash_db_ratio, bytes_max_delete_chunk),
cv_(&mu_),
closing_(false),
bg_thread_(nullptr),
reserved_disk_buffer_(0),
free_space_trigger_(0),
cur_instance_(nullptr) {
}
SstFileManagerImpl::~SstFileManagerImpl() {}
SstFileManagerImpl::~SstFileManagerImpl() {
{
MutexLock l(&mu_);
closing_ = true;
cv_.SignalAll();
}
if (bg_thread_) {
bg_thread_->join();
}
}
Status SstFileManagerImpl::OnAddFile(const std::string& file_path) {
Status SstFileManagerImpl::OnAddFile(const std::string& file_path,
bool compaction) {
uint64_t file_size;
Status s = env_->GetFileSize(file_path, &file_size);
if (s.ok()) {
MutexLock l(&mu_);
OnAddFileImpl(file_path, file_size);
OnAddFileImpl(file_path, file_size, compaction);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnAddFile");
return s;
@ -61,6 +80,19 @@ void SstFileManagerImpl::OnCompactionCompletion(Compaction* c) {
}
}
cur_compactions_reserved_size_ -= size_added_by_compaction;
auto new_files = c->edit()->GetNewFiles();
for (auto& new_file : new_files) {
auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
new_file.second.fd.GetNumber(),
new_file.second.fd.GetPathId());
if (in_progress_files_.find(fn) != in_progress_files_.end()) {
auto tracked_file = tracked_files_.find(fn);
assert(tracked_file != tracked_files_.end());
in_progress_files_size_ -= tracked_file->second;
in_progress_files_.erase(fn);
}
}
}
Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
@ -71,7 +103,7 @@ Status SstFileManagerImpl::OnMoveFile(const std::string& old_path,
if (file_size != nullptr) {
*file_size = tracked_files_[old_path];
}
OnAddFileImpl(new_path, tracked_files_[old_path]);
OnAddFileImpl(new_path, tracked_files_[old_path], false);
OnDeleteFileImpl(old_path);
}
TEST_SYNC_POINT("SstFileManagerImpl::OnMoveFile");
@ -107,7 +139,8 @@ bool SstFileManagerImpl::IsMaxAllowedSpaceReachedIncludingCompactions() {
}
bool SstFileManagerImpl::EnoughRoomForCompaction(
const std::vector<CompactionInputFiles>& inputs) {
ColumnFamilyData* cfd, const std::vector<CompactionInputFiles>& inputs,
Status bg_error) {
MutexLock l(&mu_);
uint64_t size_added_by_compaction = 0;
// First check if we even have the space to do the compaction
@ -118,15 +151,47 @@ bool SstFileManagerImpl::EnoughRoomForCompaction(
}
}
// Update cur_compactions_reserved_size_ so concurrent compaction
// don't max out space
size_t needed_headroom =
cur_compactions_reserved_size_ + size_added_by_compaction +
compaction_buffer_size_;
if (max_allowed_space_ != 0 &&
(size_added_by_compaction + cur_compactions_reserved_size_ +
total_files_size_ + compaction_buffer_size_ >
max_allowed_space_)) {
(needed_headroom + total_files_size_ > max_allowed_space_)) {
return false;
}
// Update cur_compactions_reserved_size_ so concurrent compaction
// don't max out space
// Implement more aggressive checks only if this DB instance has already
// seen a NoSpace() error. This is tin order to contain a single potentially
// misbehaving DB instance and prevent it from slowing down compactions of
// other DB instances
if (CheckFreeSpace() && bg_error == Status::NoSpace()) {
auto fn =
TableFileName(cfd->ioptions()->cf_paths, inputs[0][0]->fd.GetNumber(),
inputs[0][0]->fd.GetPathId());
uint64_t free_space = 0;
env_->GetFreeSpace(fn, &free_space);
// needed_headroom is based on current size reserved by compactions,
// minus any files created by running compactions as they would count
// against the reserved size. If user didn't specify any compaction
// buffer, add reserved_disk_buffer_ that's calculated by default so the
// compaction doesn't end up leaving nothing for logs and flush SSTs
if (compaction_buffer_size_ == 0) {
needed_headroom += reserved_disk_buffer_;
}
needed_headroom -= in_progress_files_size_;
if (free_space < needed_headroom + size_added_by_compaction) {
// We hit the condition of not enough disk space
ROCKS_LOG_ERROR(logger_, "free space [%d bytes] is less than "
"needed headroom [%d bytes]\n", free_space, needed_headroom);
return false;
}
}
cur_compactions_reserved_size_ += size_added_by_compaction;
// Take a snapshot of cur_compactions_reserved_size_ for when we encounter
// a NoSpace error.
free_space_trigger_ = cur_compactions_reserved_size_;
return true;
}
@ -166,6 +231,169 @@ uint64_t SstFileManagerImpl::GetTotalTrashSize() {
return delete_scheduler_.GetTotalTrashSize();
}
void SstFileManagerImpl::ReserveDiskBuffer(uint64_t size,
const std::string& path) {
MutexLock l(&mu_);
reserved_disk_buffer_ += size;
if (path_.empty()) {
path_ = path;
}
}
void SstFileManagerImpl::ClearError() {
while (true) {
MutexLock l(&mu_);
if (closing_) {
return;
}
uint64_t free_space;
Status s = env_->GetFreeSpace(path_, &free_space);
if (s.ok()) {
// In case of multi-DB instances, some of them may have experienced a
// soft error and some a hard error. In the SstFileManagerImpl, a hard
// error will basically override previously reported soft errors. Once
// we clear the hard error, we don't keep track of previous errors for
// now
if (bg_err_.severity() == Status::Severity::kHardError) {
if (free_space < reserved_disk_buffer_) {
ROCKS_LOG_ERROR(logger_, "free space [%d bytes] is less than "
"required disk buffer [%d bytes]\n", free_space,
reserved_disk_buffer_);
ROCKS_LOG_ERROR(logger_, "Cannot clear hard error\n");
s = Status::NoSpace();
}
} else if (bg_err_.severity() == Status::Severity::kSoftError) {
if (free_space < free_space_trigger_) {
ROCKS_LOG_WARN(logger_, "free space [%d bytes] is less than "
"free space for compaction trigger [%d bytes]\n", free_space,
free_space_trigger_);
ROCKS_LOG_WARN(logger_, "Cannot clear soft error\n");
s = Status::NoSpace();
}
}
}
// Someone could have called CancelErrorRecovery() and the list could have
// become empty, so check again here
if (s.ok() && !error_handler_list_.empty()) {
auto error_handler = error_handler_list_.front();
// Since we will release the mutex, set cur_instance_ to signal to the
// shutdown thread, if it calls // CancelErrorRecovery() the meantime,
// to indicate that this DB instance is busy. The DB instance is
// guaranteed to not be deleted before RecoverFromBGError() returns,
// since the ErrorHandler::recovery_in_prog_ flag would be true
cur_instance_ = error_handler;
mu_.Unlock();
s = error_handler->RecoverFromBGError();
mu_.Lock();
// The DB instance might have been deleted while we were
// waiting for the mutex, so check cur_instance_ to make sure its
// still non-null
if (cur_instance_) {
// Check for error again, since the instance may have recovered but
// immediately got another error. If that's the case, and the new
// error is also a NoSpace() non-fatal error, leave the instance in
// the list
Status err = cur_instance_->GetBGError();
if (s.ok() && err == Status::NoSpace() &&
err.severity() < Status::Severity::kFatalError) {
s = err;
}
cur_instance_ = nullptr;
}
if (s.ok() || s.IsShutdownInProgress() ||
(!s.ok() && s.severity() >= Status::Severity::kFatalError)) {
// If shutdown is in progress, abandon this handler instance
// and continue with the others
error_handler_list_.pop_front();
}
}
if (!error_handler_list_.empty()) {
// If there are more instances to be recovered, reschedule after 5
// seconds
int64_t wait_until = env_->NowMicros() + 5000000;
cv_.TimedWait(wait_until);
}
// Check again for error_handler_list_ empty, as a DB instance shutdown
// could have removed it from the queue while we were in timed wait
if (error_handler_list_.empty()) {
ROCKS_LOG_INFO(logger_, "Clearing error\n");
bg_err_ = Status::OK();
return;
}
}
}
void SstFileManagerImpl::StartErrorRecovery(ErrorHandler* handler,
Status bg_error) {
MutexLock l(&mu_);
if (bg_error.severity() == Status::Severity::kSoftError) {
if (bg_err_.ok()) {
// Setting bg_err_ basically means we're in degraded mode
// Assume that all pending compactions will fail similarly. The trigger
// for clearing this condition is set to current compaction reserved
// size, so we stop checking disk space available in
// EnoughRoomForCompaction once this much free space is available
bg_err_ = bg_error;
}
} else if (bg_error.severity() == Status::Severity::kHardError) {
bg_err_ = bg_error;
} else {
assert(false);
}
// If this is the first instance of this error, kick of a thread to poll
// and recover from this condition
if (error_handler_list_.empty()) {
error_handler_list_.push_back(handler);
// Release lock before calling join. Its ok to do so because
// error_handler_list_ is now non-empty, so no other invocation of this
// function will execute this piece of code
mu_.Unlock();
if (bg_thread_) {
bg_thread_->join();
}
// Start a new thread. The previous one would have exited.
bg_thread_.reset(new port::Thread(&SstFileManagerImpl::ClearError, this));
mu_.Lock();
} else {
// Check if this DB instance is already in the list
for (auto iter = error_handler_list_.begin();
iter != error_handler_list_.end(); ++iter) {
if ((*iter) == handler) {
return;
}
}
error_handler_list_.push_back(handler);
}
}
bool SstFileManagerImpl::CancelErrorRecovery(ErrorHandler* handler) {
MutexLock l(&mu_);
if (cur_instance_ == handler) {
// This instance is currently busy attempting to recover
// Nullify it so the recovery thread doesn't attempt to access it again
cur_instance_ = nullptr;
return false;
}
for (auto iter = error_handler_list_.begin();
iter != error_handler_list_.end(); ++iter) {
if ((*iter) == handler) {
error_handler_list_.erase(iter);
return true;
}
}
return false;
}
Status SstFileManagerImpl::ScheduleFileDeletion(
const std::string& file_path, const std::string& path_to_sync) {
return delete_scheduler_.DeleteFile(file_path, path_to_sync);
@ -176,14 +404,24 @@ void SstFileManagerImpl::WaitForEmptyTrash() {
}
void SstFileManagerImpl::OnAddFileImpl(const std::string& file_path,
uint64_t file_size) {
uint64_t file_size, bool compaction) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file != tracked_files_.end()) {
// File was added before, we will just update the size
assert(!compaction);
total_files_size_ -= tracked_file->second;
total_files_size_ += file_size;
cur_compactions_reserved_size_ -= file_size;
} else {
total_files_size_ += file_size;
if (compaction) {
// Keep track of the size of files created by in-progress compactions.
// When calculating whether there's enough headroom for new compactions,
// this will be subtracted from cur_compactions_reserved_size_.
// Otherwise, compactions will be double counted.
in_progress_files_size_ += file_size;
in_progress_files_.insert(file_path);
}
}
tracked_files_[file_path] = file_size;
}
@ -192,10 +430,16 @@ void SstFileManagerImpl::OnDeleteFileImpl(const std::string& file_path) {
auto tracked_file = tracked_files_.find(file_path);
if (tracked_file == tracked_files_.end()) {
// File is not tracked
assert(in_progress_files_.find(file_path) == in_progress_files_.end());
return;
}
total_files_size_ -= tracked_file->second;
// Check if it belonged to an in-progress compaction
if (in_progress_files_.find(file_path) != in_progress_files_.end()) {
in_progress_files_size_ -= tracked_file->second;
in_progress_files_.erase(file_path);
}
tracked_files_.erase(tracked_file);
}

@ -12,6 +12,7 @@
#include "port/port.h"
#include "db/compaction.h"
#include "db/error_handler.h"
#include "rocksdb/sst_file_manager.h"
#include "util/delete_scheduler.h"
@ -33,7 +34,7 @@ class SstFileManagerImpl : public SstFileManager {
~SstFileManagerImpl();
// DB will call OnAddFile whenever a new sst file is added.
Status OnAddFile(const std::string& file_path);
Status OnAddFile(const std::string& file_path, bool compaction = false);
// DB will call OnDeleteFile whenever an sst file is deleted.
Status OnDeleteFile(const std::string& file_path);
@ -67,7 +68,9 @@ class SstFileManagerImpl : public SstFileManager {
// estimates how much space is currently being used by compactions (i.e.
// if a compaction has started, this function bumps the used space by
// the full compaction size).
bool EnoughRoomForCompaction(const std::vector<CompactionInputFiles>& inputs);
bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
const std::vector<CompactionInputFiles>& inputs,
Status bg_error);
// Bookkeeping so total_file_sizes_ goes back to normal after compaction
// finishes
@ -96,6 +99,18 @@ class SstFileManagerImpl : public SstFileManager {
// Return the total size of trash files
uint64_t GetTotalTrashSize() override;
// Called by each DB instance using this sst file manager to reserve
// disk buffer space for recovery from out of space errors
void ReserveDiskBuffer(uint64_t buffer, const std::string& path);
// Set a flag upon encountering disk full. May enqueue the ErrorHandler
// instance for background polling and recovery
void StartErrorRecovery(ErrorHandler* db, Status bg_error);
// Remove the given Errorhandler instance from the recovery queue. Its
// not guaranteed
bool CancelErrorRecovery(ErrorHandler* db);
// Mark file as trash and schedule it's deletion.
virtual Status ScheduleFileDeletion(const std::string& file_path,
const std::string& dir_to_sync);
@ -108,16 +123,24 @@ class SstFileManagerImpl : public SstFileManager {
private:
// REQUIRES: mutex locked
void OnAddFileImpl(const std::string& file_path, uint64_t file_size);
void OnAddFileImpl(const std::string& file_path, uint64_t file_size,
bool compaction);
// REQUIRES: mutex locked
void OnDeleteFileImpl(const std::string& file_path);
void ClearError();
bool CheckFreeSpace() {
return bg_err_.severity() == Status::Severity::kSoftError;
}
Env* env_;
std::shared_ptr<Logger> logger_;
// Mutex to protect tracked_files_, total_files_size_
port::Mutex mu_;
// The summation of the sizes of all files in tracked_files_ map
uint64_t total_files_size_;
// The summation of all output files of in-progress compactions
uint64_t in_progress_files_size_;
// Compactions should only execute if they can leave at least
// this amount of buffer space for logs and flushes
uint64_t compaction_buffer_size_;
@ -126,10 +149,32 @@ class SstFileManagerImpl : public SstFileManager {
// A map containing all tracked files and there sizes
// file_path => file_size
std::unordered_map<std::string, uint64_t> tracked_files_;
// A set of files belonging to in-progress compactions
std::unordered_set<std::string> in_progress_files_;
// The maximum allowed space (in bytes) for sst files.
uint64_t max_allowed_space_;
// DeleteScheduler used to throttle file deletition.
DeleteScheduler delete_scheduler_;
port::CondVar cv_;
// Flag to force error recovery thread to exit
bool closing_;
// Background error recovery thread
std::unique_ptr<port::Thread> bg_thread_;
// A path in the filesystem corresponding to this SFM. This is used for
// calling Env::GetFreeSpace. Posix requires a path in the filesystem
std::string path_;
// Save the current background error
Status bg_err_;
// Amount of free disk headroom before allowing recovery from hard errors
uint64_t reserved_disk_buffer_;
// For soft errors, amount of free disk space before we can allow
// compactions to run full throttle. If disk space is below this trigger,
// compactions will be gated by free disk space > input size
uint64_t free_space_trigger_;
// List of database error handler instances tracked by this sst file manager
std::list<ErrorHandler*> error_handler_list_;
// Pointer to ErrorHandler instance that is currently processing recovery
ErrorHandler* cur_instance_;
};
} // namespace rocksdb

Loading…
Cancel
Save