Introduce OnBackgroundError callback

Summary:
Some users want to prevent rocksdb from entering read-only mode in certain error cases. This diff gives them a callback, `OnBackgroundError`, that they can use to achieve it.

- call `OnBackgroundError` every time we consider setting `bg_error_`. Use its result to assign `bg_error_` but not to change the function's return status.
- classified calls using `BackgroundErrorReason` to give the callback some info about where the error happened
- renamed `ParanoidCheck` to something more specific so we can provide a clear `BackgroundErrorReason`
- unit tests for the most common cases: flush or compaction errors
Closes https://github.com/facebook/rocksdb/pull/2477

Differential Revision: D5300190

Pulled By: ajkr

fbshipit-source-id: a0ea4564249719b83428e3f4c6ca2c49e366e9b3
main
Andrew Kryczka 8 years ago committed by Facebook Github Bot
parent 88cd2d96e7
commit 71f5bcb730
  1. 18
      db/compaction_job.cc
  2. 2
      db/db_impl.h
  3. 52
      db/db_impl_compaction_flush.cc
  4. 25
      db/db_impl_write.cc
  5. 18
      db/event_helpers.cc
  6. 4
      db/event_helpers.h
  7. 86
      db/listener_test.cc
  8. 21
      include/rocksdb/listener.h

@ -1133,12 +1133,22 @@ Status CompactionJob::FinishCompactionOutputFile(
meta->fd.GetPathId()); meta->fd.GetPathId());
sfm->OnAddFile(fn); sfm->OnAddFile(fn);
if (sfm->IsMaxAllowedSpaceReached()) { if (sfm->IsMaxAllowedSpaceReached()) {
InstrumentedMutexLock l(db_mutex_); // TODO(ajkr): should we return OK() if max space was reached by the final
if (db_bg_error_->ok()) { // compaction output file (similarly to how flush works when full)?
s = Status::IOError("Max allowed space was reached"); s = Status::IOError("Max allowed space was reached");
*db_bg_error_ = s;
TEST_SYNC_POINT( TEST_SYNC_POINT(
"CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); "CompactionJob::FinishCompactionOutputFile:"
"MaxAllowedSpaceReached");
InstrumentedMutexLock l(db_mutex_);
if (db_bg_error_->ok()) {
Status new_bg_error = s;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(
cfd->ioptions()->listeners, BackgroundErrorReason::kCompaction,
&new_bg_error, db_mutex_);
if (!new_bg_error.ok()) {
*db_bg_error_ = new_bg_error;
}
} }
} }
} }

@ -751,7 +751,7 @@ class DBImpl : public DB {
bool need_log_dir_sync, SequenceNumber sequence); bool need_log_dir_sync, SequenceNumber sequence);
// Used by WriteImpl to update bg_error_ if paranoid check is enabled. // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
void ParanoidCheck(const Status& status); void WriteCallbackStatusCheck(const Status& status);
// Used by WriteImpl to update bg_error_ in case of memtable insert error. // Used by WriteImpl to update bg_error_ in case of memtable insert error.
void MemTableInsertStatusCheck(const Status& memtable_insert_status); void MemTableInsertStatusCheck(const Status& memtable_insert_status);

@ -16,6 +16,7 @@
#include <inttypes.h> #include <inttypes.h>
#include "db/builder.h" #include "db/builder.h"
#include "db/event_helpers.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_updater.h" #include "monitoring/thread_status_updater.h"
@ -61,7 +62,14 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
// "number < current_log_number". // "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, s); MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) { if (!s.ok()) {
bg_error_ = s; Status new_bg_error = s;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kFlush,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s; return s;
} }
@ -136,9 +144,16 @@ Status DBImpl::FlushMemTableToOutputFile(
if (!s.ok() && !s.IsShutdownInProgress() && if (!s.ok() && !s.IsShutdownInProgress() &&
immutable_db_options_.paranoid_checks && bg_error_.ok()) { immutable_db_options_.paranoid_checks && bg_error_.ok()) {
// if a bad error happened (not ShutdownInProgress) and paranoid_checks is Status new_bg_error = s;
// true, mark DB read-only // may temporarily unlock and lock the mutex.
bg_error_ = s; EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kFlush,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
// if a bad error happened (not ShutdownInProgress), paranoid_checks is
// true, and the error isn't handled by callback, mark DB read-only
bg_error_ = new_bg_error;
}
} }
if (s.ok()) { if (s.ok()) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -153,10 +168,17 @@ Status DBImpl::FlushMemTableToOutputFile(
immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber()); immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber());
sfm->OnAddFile(file_path); sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) { if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
bg_error_ = Status::IOError("Max allowed space was reached"); Status new_bg_error = Status::IOError("Max allowed space was reached");
TEST_SYNC_POINT_CALLBACK( TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached", "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
&bg_error_); &new_bg_error);
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kFlush,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
} }
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -567,7 +589,14 @@ Status DBImpl::CompactFilesImpl(
c->column_family_data()->GetName().c_str(), c->column_family_data()->GetName().c_str(),
job_context->job_id, status.ToString().c_str()); job_context->job_id, status.ToString().c_str());
if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status; Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kCompaction,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
} }
} }
@ -1625,7 +1654,14 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str()); status.ToString().c_str());
if (immutable_db_options_.paranoid_checks && bg_error_.ok()) { if (immutable_db_options_.paranoid_checks && bg_error_.ok()) {
bg_error_ = status; Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kCompaction,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error;
}
} }
} }

@ -14,6 +14,7 @@
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#endif #endif
#include <inttypes.h> #include <inttypes.h>
#include "db/event_helpers.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "options/options_helper.h" #include "options/options_helper.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -255,7 +256,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time); PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) { if (!w.CallbackFailed()) {
ParanoidCheck(status); WriteCallbackStatusCheck(status);
} }
if (need_log_sync) { if (need_log_sync) {
@ -356,7 +357,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
} }
if (!w.CallbackFailed()) { if (!w.CallbackFailed()) {
ParanoidCheck(w.status); WriteCallbackStatusCheck(w.status);
} }
if (need_log_sync) { if (need_log_sync) {
@ -409,14 +410,21 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
return w.FinalStatus(); return w.FinalStatus();
} }
void DBImpl::ParanoidCheck(const Status& status) { void DBImpl::WriteCallbackStatusCheck(const Status& status) {
// Is setting bg_error_ enough here? This will at least stop // Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes. // compaction and fail any further writes.
if (immutable_db_options_.paranoid_checks && !status.ok() && if (immutable_db_options_.paranoid_checks && !status.ok() &&
!status.IsBusy() && !status.IsIncomplete()) { !status.IsBusy() && !status.IsIncomplete()) {
mutex_.Lock(); mutex_.Lock();
if (bg_error_.ok()) { if (bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kWriteCallback,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error; // stop compaction & fail any further writes
}
} }
mutex_.Unlock(); mutex_.Unlock();
} }
@ -431,7 +439,14 @@ void DBImpl::MemTableInsertStatusCheck(const Status& status) {
if (!status.ok()) { if (!status.ok()) {
mutex_.Lock(); mutex_.Lock();
assert(bg_error_.ok()); assert(bg_error_.ok());
bg_error_ = status; Status new_bg_error = status;
// may temporarily unlock and lock the mutex.
EventHelpers::NotifyOnBackgroundError(immutable_db_options_.listeners,
BackgroundErrorReason::kMemTable,
&new_bg_error, &mutex_);
if (!new_bg_error.ok()) {
bg_error_ = new_bg_error; // stop compaction & fail any further writes
}
mutex_.Unlock(); mutex_.Unlock();
} }
} }

@ -39,6 +39,24 @@ void EventHelpers::NotifyTableFileCreationStarted(
} }
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
void EventHelpers::NotifyOnBackgroundError(
const std::vector<std::shared_ptr<EventListener>>& listeners,
BackgroundErrorReason reason, Status* bg_error,
InstrumentedMutex* db_mutex) {
#ifndef ROCKSDB_LITE
if (listeners.size() == 0U) {
return;
}
db_mutex->AssertHeld();
// release lock while notifying events
db_mutex->Unlock();
for (auto& listener : listeners) {
listener->OnBackgroundError(reason, bg_error);
}
db_mutex->Lock();
#endif // ROCKSDB_LITE
}
void EventHelpers::LogAndNotifyTableFileCreationFinished( void EventHelpers::LogAndNotifyTableFileCreationFinished(
EventLogger* event_logger, EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners, const std::vector<std::shared_ptr<EventListener>>& listeners,

@ -27,6 +27,10 @@ class EventHelpers {
const std::string& db_name, const std::string& cf_name, const std::string& db_name, const std::string& cf_name,
const std::string& file_path, int job_id, TableFileCreationReason reason); const std::string& file_path, int job_id, TableFileCreationReason reason);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
static void NotifyOnBackgroundError(
const std::vector<std::shared_ptr<EventListener>>& listeners,
BackgroundErrorReason reason, Status* bg_error,
InstrumentedMutex* db_mutex);
static void LogAndNotifyTableFileCreationFinished( static void LogAndNotifyTableFileCreationFinished(
EventLogger* event_logger, EventLogger* event_logger,
const std::vector<std::shared_ptr<EventListener>>& listeners, const std::vector<std::shared_ptr<EventListener>>& listeners,

@ -792,6 +792,92 @@ TEST_F(EventListenerTest, ColumnFamilyHandleDeletionStartedListenerTest) {
ASSERT_EQ(listener->getCounter(), 3); ASSERT_EQ(listener->getCounter(), 3);
} }
class BackgroundErrorListener : public EventListener {
private:
SpecialEnv* env_;
int counter_;
public:
BackgroundErrorListener(SpecialEnv* env) : env_(env), counter_(0) {}
void OnBackgroundError(BackgroundErrorReason reason, Status* bg_error) override {
if (counter_ == 0) {
// suppress the first error and disable write-dropping such that a retry
// can succeed.
*bg_error = Status::OK();
env_->drop_writes_.store(false, std::memory_order_release);
env_->no_slowdown_ = false;
}
++counter_;
}
int counter() { return counter_; }
};
TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
auto listener = std::make_shared<BackgroundErrorListener>(env_);
Options options;
options.create_if_missing = true;
options.env = env_;
options.listeners.push_back(listener);
options.memtable_factory.reset(new SpecialSkipListFactory(1));
options.paranoid_checks = true;
DestroyAndReopen(options);
// the usual TEST_WaitForFlushMemTable() doesn't work for failed flushes, so
// forge a custom one for the failed flush case.
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BGWorkFlush:done",
"EventListenerTest:BackgroundErrorListenerFailedFlushTest:1"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
env_->drop_writes_.store(true, std::memory_order_release);
env_->no_slowdown_ = true;
ASSERT_OK(Put("key0", "val"));
ASSERT_OK(Put("key1", "val"));
TEST_SYNC_POINT("EventListenerTest:BackgroundErrorListenerFailedFlushTest:1");
ASSERT_EQ(1, listener->counter());
ASSERT_OK(Put("key2", "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
auto listener = std::make_shared<BackgroundErrorListener>(env_);
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
options.level0_file_num_compaction_trigger = 2;
options.listeners.push_back(listener);
options.memtable_factory.reset(new SpecialSkipListFactory(2));
options.paranoid_checks = true;
DestroyAndReopen(options);
// third iteration triggers the second memtable's flush
for (int i = 0; i < 3; ++i) {
ASSERT_OK(Put("key0", "val"));
if (i > 0) {
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
ASSERT_OK(Put("key1", "val"));
}
ASSERT_EQ(2, NumTableFilesAtLevel(0));
env_->drop_writes_.store(true, std::memory_order_release);
env_->no_slowdown_ = true;
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, listener->counter());
// trigger flush so compaction is triggered again; this time it succeeds
ASSERT_OK(Put("key0", "val"));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
}
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -77,6 +77,13 @@ enum class CompactionReason {
kFilesMarkedForCompaction, kFilesMarkedForCompaction,
}; };
enum class BackgroundErrorReason {
kFlush,
kCompaction,
kWriteCallback,
kMemTable,
};
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
struct TableFileDeletionInfo { struct TableFileDeletionInfo {
@ -348,6 +355,20 @@ class EventListener {
virtual void OnExternalFileIngested( virtual void OnExternalFileIngested(
DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {} DB* /*db*/, const ExternalFileIngestionInfo& /*info*/) {}
// A call-back function for RocksDB which will be called before setting the
// background error status to a non-OK value. The new background error status
// is provided in `bg_error` and can be modified by the callback. E.g., a
// callback can suppress errors by resetting it to Status::OK(), thus
// preventing the database from entering read-only mode. We do not provide any
// guarantee when failed flushes/compactions will be rescheduled if the user
// suppresses an error.
//
// Note that this function can run on the same threads as flush, compaction,
// and user writes. So, it is extremely important not to perform heavy
// computations or blocking calls in this function.
virtual void OnBackgroundError(BackgroundErrorReason /* reason */,
Status* /* bg_error */) {}
// Factory method to return CompactionEventListener. If multiple listeners // Factory method to return CompactionEventListener. If multiple listeners
// provides CompactionEventListner, only the first one will be used. // provides CompactionEventListner, only the first one will be used.
virtual CompactionEventListener* GetCompactionEventListener() { virtual CompactionEventListener* GetCompactionEventListener() {

Loading…
Cancel
Save