First step towards handling MANIFEST write error (#6949)

Summary:
This PR provides preliminary support for handling IO error during MANIFEST write.
File write/sync is not guaranteed to be atomic. If we encounter an IOError while writing/syncing to the MANIFEST file, we cannot be sure about the state of the MANIFEST file. The version edits may or may not have reached the file. During cleanup, if we delete the newly-generated SST files referenced by the pending version edit(s), but the version edit(s) actually are persistent in the MANIFEST, then next recovery attempt will process the version edits(s) and then fail since the SST files have already been deleted.
One approach is to truncate the MANIFEST after write/sync error, so that it is safe to delete the SST files. However, file truncation may not be supported on certain file systems. Therefore, we take the following approach.
If an IOError is detected during MANIFEST write/sync, we disable file deletions for the faulty database. Depending on whether the IOError is retryable (set by underlying file system), either RocksDB or application can call `DB::Resume()`, or simply shutdown and restart. During `Resume()`, RocksDB will try to switch to a new MANIFEST and write all existing in-memory version storage in the new file. If this succeeds, then RocksDB may proceed. If all recovery is completed, then file deletions will be re-enabled.
Note that multiple threads can call `LogAndApply()` at the same time, though only one of them will be going through the process MANIFEST write, possibly batching the version edits of other threads. When the leading MANIFEST writer finishes, all of the MANIFEST writing threads in this batch will have the same IOError. They will all call `ErrorHandler::SetBGError()` in which file deletion will be disabled.

Possible future directions:
- Add an `ErrorContext` structure so that it is easier to pass more info to `ErrorHandler`. Currently, as in this example, a new `BackgroundErrorReason` has to be added.

Test plan (dev server):
make check
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6949

Reviewed By: anand1976

Differential Revision: D22026020

Pulled By: riversand963

fbshipit-source-id: f3c68a2ef45d9b505d0d625c7c5e0c88495b91c8
main
Yanqin Jin 5 years ago committed by Facebook GitHub Bot
parent 9cc25190e1
commit e66199d848
  1. 1
      HISTORY.md
  2. 2
      db/compaction/compaction_job.cc
  3. 27
      db/db_basic_test.cc
  4. 51
      db/db_filesnapshot.cc
  5. 45
      db/db_impl/db_impl.cc
  6. 11
      db/db_impl/db_impl.h
  7. 34
      db/db_impl/db_impl_compaction_flush.cc
  8. 56
      db/db_impl/db_impl_files.cc
  9. 3
      db/db_test.cc
  10. 30
      db/error_handler.cc
  11. 2
      db/internal_stats.cc
  12. 2
      db/memtable_list.cc
  13. 6
      db/version_set.cc
  14. 2
      db/version_set.h
  15. 3
      include/rocksdb/db.h
  16. 1
      include/rocksdb/listener.h

@ -8,6 +8,7 @@
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early. * In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too. * Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement. * Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
* Disable file deletion after MANIFEST write/sync failure until db re-open or Resume() so that subsequent re-open will not see MANIFEST referencing deleted SSTs.
### Public API Change ### Public API Change
* `DB::GetDbSessionId(std::string& session_id)` is added. `session_id` stores a unique identifier that gets reset every time the DB is opened. This DB session ID should be unique among all open DB instances on all hosts, and should be unique among re-openings of the same or other DBs. This identifier is recorded in the LOG file on the line starting with "DB Session ID:". * `DB::GetDbSessionId(std::string& session_id)` is added. `session_id` stores a unique identifier that gets reset every time the DB is opened. This DB session ID should be unique among all open DB instances on all hosts, and should be unique among re-openings of the same or other DBs. This identifier is recorded in the LOG file on the line starting with "DB Session ID:".

@ -724,7 +724,7 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
cfd->internal_stats()->AddCompactionStats( cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_); compact_->compaction->output_level(), thread_pri_, compaction_stats_);
versions_->SetIOStatusOK(); versions_->SetIOStatus(IOStatus::OK());
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options); status = InstallCompactionResults(mutable_cf_options);
} }

@ -3073,6 +3073,33 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
Close(); Close();
} }
TEST_F(DBBasicTest, ManifestWriteFailure) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
ASSERT_NE(nullptr, arg);
auto* s = reinterpret_cast<Status*>(arg);
ASSERT_OK(*s);
// Manually overwrite return status
*s = Status::IOError();
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("key", "value"));
ASSERT_NOK(Flush());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS #ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS

@ -23,57 +23,6 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
Status DBImpl::DisableFileDeletions() {
InstrumentedMutexLock l(&mutex_);
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions(bool force) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool file_deletion_enabled = false;
{
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
file_deletion_enabled = true;
FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
}
}
if (file_deletion_enabled) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
}
job_context.Clean();
LogFlush(immutable_db_options_.info_log);
return Status::OK();
}
int DBImpl::IsFileDeletionsEnabled() const {
return !disable_delete_obsolete_files_;
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret, Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size, uint64_t* manifest_file_size,
bool flush_memtable) { bool flush_memtable) {

@ -313,8 +313,36 @@ Status DBImpl::ResumeImpl() {
} }
// Make sure the IO Status stored in version set is set to OK. // Make sure the IO Status stored in version set is set to OK.
bool file_deletion_disabled = !IsFileDeletionsEnabled();
if (s.ok()) { if (s.ok()) {
versions_->SetIOStatusOK(); IOStatus io_s = versions_->io_status();
if (io_s.IsIOError()) {
// If resuming from IOError resulted from MANIFEST write, then assert
// that we must have already set the MANIFEST writer to nullptr during
// clean-up phase MANIFEST writing. We must have also disabled file
// deletions.
assert(!versions_->descriptor_log_);
assert(file_deletion_disabled);
// Since we are trying to recover from MANIFEST write error, we need to
// switch to a new MANIFEST anyway. The old MANIFEST can be corrupted.
// Therefore, force writing a dummy version edit because we do not know
// whether there are flush jobs with non-empty data to flush, triggering
// appends to MANIFEST.
VersionEdit edit;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(default_cf_handle_);
assert(cfh);
ColumnFamilyData* cfd = cfh->cfd();
const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions();
s = versions_->LogAndApply(cfd, cf_opts, &edit, &mutex_,
directories_.GetDbDir());
if (!s.ok()) {
io_s = versions_->io_status();
if (!io_s.ok()) {
s = error_handler_.SetBGError(io_s,
BackgroundErrorReason::kManifestWrite);
}
}
}
} }
// We cannot guarantee consistency of the WAL. So force flush Memtables of // We cannot guarantee consistency of the WAL. So force flush Memtables of
@ -365,6 +393,13 @@ Status DBImpl::ResumeImpl() {
job_context.Clean(); job_context.Clean();
if (s.ok()) { if (s.ok()) {
assert(versions_->io_status().ok());
// If we reach here, we should re-enable file deletions if it was disabled
// during previous error handling.
if (file_deletion_disabled) {
// Always return ok
EnableFileDeletions(/*force=*/true);
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB"); ROCKS_LOG_INFO(immutable_db_options_.info_log, "Successfully resumed DB");
} }
mutex_.Lock(); mutex_.Lock();
@ -4405,6 +4440,14 @@ Status DBImpl::IngestExternalFiles(
#endif // !NDEBUG #endif // !NDEBUG
} }
} }
} else if (versions_->io_status().IsIOError()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
const IOStatus& io_s = versions_->io_status();
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
} }
// Resume writes to the DB // Resume writes to the DB

@ -358,6 +358,12 @@ class DBImpl : public DB {
virtual Status Close() override; virtual Status Close() override;
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual bool IsFileDeletionsEnabled() const;
Status GetStatsHistory( Status GetStatsHistory(
uint64_t start_time, uint64_t end_time, uint64_t start_time, uint64_t end_time,
std::unique_ptr<StatsHistoryIterator>* stats_iterator) override; std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
@ -365,9 +371,6 @@ class DBImpl : public DB {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
using DB::ResetStats; using DB::ResetStats;
virtual Status ResetStats() override; virtual Status ResetStats() override;
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual int IsFileDeletionsEnabled() const;
// All the returned filenames start with "/" // All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&, virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size, uint64_t* manifest_file_size,
@ -1789,6 +1792,8 @@ class DBImpl : public DB {
SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback, SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback,
bool* is_blob_index); bool* is_blob_index);
Status DisableFileDeletionsWithLock();
// table_cache_ provides its own synchronization // table_cache_ provides its own synchronization
std::shared_ptr<Cache> table_cache_; std::shared_ptr<Cache> table_cache_;

@ -211,7 +211,15 @@ Status DBImpl::FlushMemTableToOutputFile(
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsShutdownInProgress() && if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) { !io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); // Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kFlush
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else { } else {
Status new_bg_error = s; Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
@ -575,7 +583,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// it is not because of CF drop. // it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) { if (!s.ok() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) { if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); // Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kFlush
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else { } else {
Status new_bg_error = s; Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
@ -2689,7 +2705,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (const auto& f : *c->inputs(0)) { for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
} }
versions_->SetIOStatusOK(); versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
@ -2747,7 +2763,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} }
} }
versions_->SetIOStatusOK(); versions_->SetIOStatus(IOStatus::OK());
status = versions_->LogAndApply(c->column_family_data(), status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(), *c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir()); &mutex_, directories_.GetDbDir());
@ -2880,7 +2896,15 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str()); status.ToString().c_str());
if (!io_s.ok()) { if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction); // Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kCompaction
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
} else { } else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
} }

@ -36,6 +36,62 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
return std::numeric_limits<uint64_t>::max(); return std::numeric_limits<uint64_t>::max();
} }
Status DBImpl::DisableFileDeletions() {
InstrumentedMutexLock l(&mutex_);
return DisableFileDeletionsWithLock();
}
Status DBImpl::DisableFileDeletionsWithLock() {
mutex_.AssertHeld();
++disable_delete_obsolete_files_;
if (disable_delete_obsolete_files_ == 1) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Disabled, but already disabled. Counter: %d",
disable_delete_obsolete_files_);
}
return Status::OK();
}
Status DBImpl::EnableFileDeletions(bool force) {
// Job id == 0 means that this is not our background process, but rather
// user thread
JobContext job_context(0);
bool file_deletion_enabled = false;
{
InstrumentedMutexLock l(&mutex_);
if (force) {
// if force, we need to enable file deletions right away
disable_delete_obsolete_files_ = 0;
} else if (disable_delete_obsolete_files_ > 0) {
--disable_delete_obsolete_files_;
}
if (disable_delete_obsolete_files_ == 0) {
file_deletion_enabled = true;
FindObsoleteFiles(&job_context, true);
bg_cv_.SignalAll();
}
}
if (file_deletion_enabled) {
ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
if (job_context.HaveSomethingToDelete()) {
PurgeObsoleteFiles(job_context);
}
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"File Deletions Enable, but not really enabled. Counter: %d",
disable_delete_obsolete_files_);
}
job_context.Clean();
LogFlush(immutable_db_options_.info_log);
return Status::OK();
}
bool DBImpl::IsFileDeletionsEnabled() const {
return 0 == disable_delete_obsolete_files_;
}
// * Returns the list of live files in 'sst_live' and 'blob_live'. // * Returns the list of live files in 'sst_live' and 'blob_live'.
// If it's doing full scan: // If it's doing full scan:
// * Returns the list of all files in the filesystem in // * Returns the list of all files in the filesystem in

@ -2990,10 +2990,11 @@ class ModelDB : public DB {
Status SyncWAL() override { return Status::OK(); } Status SyncWAL() override { return Status::OK(); }
#ifndef ROCKSDB_LITE
Status DisableFileDeletions() override { return Status::OK(); } Status DisableFileDeletions() override { return Status::OK(); }
Status EnableFileDeletions(bool /*force*/) override { return Status::OK(); } Status EnableFileDeletions(bool /*force*/) override { return Status::OK(); }
#ifndef ROCKSDB_LITE
Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/, Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
bool /*flush_memtable*/ = true) override { bool /*flush_memtable*/ = true) override {
return Status::OK(); return Status::OK();

@ -51,9 +51,19 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, Status::SubCode, bool>,
Status::Code::kIOError, Status::SubCode::kNoSpace, Status::Code::kIOError, Status::SubCode::kNoSpace,
false), false),
Status::Severity::kHardError}, Status::Severity::kHardError},
// Errors during MANIFEST write
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
true),
Status::Severity::kHardError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, Status::SubCode::kNoSpace,
false),
Status::Severity::kHardError},
}; };
std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity> std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>,
Status::Severity>
DefaultErrorSeverityMap = { DefaultErrorSeverityMap = {
// Errors during BG compaction // Errors during BG compaction
{std::make_tuple(BackgroundErrorReason::kCompaction, {std::make_tuple(BackgroundErrorReason::kCompaction,
@ -75,11 +85,11 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
{std::make_tuple(BackgroundErrorReason::kFlush, {std::make_tuple(BackgroundErrorReason::kFlush,
Status::Code::kCorruption, false), Status::Code::kCorruption, false),
Status::Severity::kNoError}, Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kFlush, {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::Code::kIOError, true), true),
Status::Severity::kFatalError}, Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kFlush, {std::make_tuple(BackgroundErrorReason::kFlush, Status::Code::kIOError,
Status::Code::kIOError, false), false),
Status::Severity::kNoError}, Status::Severity::kNoError},
// Errors during Write // Errors during Write
{std::make_tuple(BackgroundErrorReason::kWriteCallback, {std::make_tuple(BackgroundErrorReason::kWriteCallback,
@ -94,6 +104,12 @@ std::map<std::tuple<BackgroundErrorReason, Status::Code, bool>, Status::Severity
{std::make_tuple(BackgroundErrorReason::kWriteCallback, {std::make_tuple(BackgroundErrorReason::kWriteCallback,
Status::Code::kIOError, false), Status::Code::kIOError, false),
Status::Severity::kNoError}, Status::Severity::kNoError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, true),
Status::Severity::kFatalError},
{std::make_tuple(BackgroundErrorReason::kManifestWrite,
Status::Code::kIOError, false),
Status::Severity::kFatalError},
}; };
std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity> std::map<std::tuple<BackgroundErrorReason, bool>, Status::Severity>
@ -247,6 +263,10 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
if (recovery_in_prog_ && recovery_error_.ok()) { if (recovery_in_prog_ && recovery_error_.ok()) {
recovery_error_ = bg_io_err; recovery_error_ = bg_io_err;
} }
if (BackgroundErrorReason::kManifestWrite == reason) {
// Always returns ok
db_->DisableFileDeletionsWithLock();
}
Status new_bg_io_err = bg_io_err; Status new_bg_io_err = bg_io_err;
Status s; Status s;
if (bg_io_err.GetDataLoss()) { if (bg_io_err.GetDataLoss()) {

@ -798,7 +798,7 @@ bool InternalStats::HandleCurrentSuperVersionNumber(uint64_t* value,
bool InternalStats::HandleIsFileDeletionsEnabled(uint64_t* value, DBImpl* db, bool InternalStats::HandleIsFileDeletionsEnabled(uint64_t* value, DBImpl* db,
Version* /*version*/) { Version* /*version*/) {
*value = db->IsFileDeletionsEnabled(); *value = db->IsFileDeletionsEnabled() ? 1 : 0;
return true; return true;
} }

@ -470,7 +470,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
} }
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
vset->SetIOStatusOK(); vset->SetIOStatus(IOStatus::OK());
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory); db_directory);
*io_s = vset->io_status(); *io_s = vset->io_status();

@ -4001,12 +4001,16 @@ Status VersionSet::ProcessManifestWrites(
} }
if (s.ok()) { if (s.ok()) {
io_s = SyncManifest(env_, db_options_, descriptor_log_->file()); io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
} }
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s; io_status_ = io_s;
s = io_s; s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str()); s.ToString().c_str());
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
} }
} }
@ -4018,6 +4022,8 @@ Status VersionSet::ProcessManifestWrites(
if (!io_s.ok()) { if (!io_s.ok()) {
io_status_ = io_s; io_status_ = io_s;
s = io_s; s = io_s;
} else if (io_status_.IsIOError()) {
io_status_ = io_s;
} }
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
} }

@ -1162,7 +1162,7 @@ class VersionSet {
IOStatus io_status() const { return io_status_; } IOStatus io_status() const { return io_status_; }
// Set the IO Status to OK. Called before Manifest write if needed. // Set the IO Status to OK. Called before Manifest write if needed.
void SetIOStatusOK() { io_status_ = IOStatus::OK(); } void SetIOStatus(const IOStatus& s) { io_status_ = s; }
protected: protected:
using VersionBuilderMap = using VersionBuilderMap =

@ -1266,8 +1266,6 @@ class DB {
// updated, false if user attempted to call if with seqnum <= current value. // updated, false if user attempted to call if with seqnum <= current value.
virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0; virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) = 0;
#ifndef ROCKSDB_LITE
// Prevent file deletions. Compactions will continue to occur, // Prevent file deletions. Compactions will continue to occur,
// but no obsolete files will be deleted. Calling this multiple // but no obsolete files will be deleted. Calling this multiple
// times have the same effect as calling it once. // times have the same effect as calling it once.
@ -1284,6 +1282,7 @@ class DB {
// threads call EnableFileDeletions() // threads call EnableFileDeletions()
virtual Status EnableFileDeletions(bool force = true) = 0; virtual Status EnableFileDeletions(bool force = true) = 0;
#ifndef ROCKSDB_LITE
// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup // GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup
// Retrieve the list of all files in the database. The files are // Retrieve the list of all files in the database. The files are

@ -117,6 +117,7 @@ enum class BackgroundErrorReason {
kCompaction, kCompaction,
kWriteCallback, kWriteCallback,
kMemTable, kMemTable,
kManifestWrite,
}; };
enum class WriteStallCondition { enum class WriteStallCondition {

Loading…
Cancel
Save