diff --git a/HISTORY.md b/HISTORY.md
index 9efb85ec5..c73d5da71 100644
--- a/HISTORY.md
+++ b/HISTORY.md
@@ -8,6 +8,7 @@
* Add support for user-defined timestamps to write-committed transaction without API change. The `TransactionDB` layer APIs do not allow timestamps because we require that all user-defined-timestamps-aware operations go through the `Transaction` APIs.
* Added BlobDB options to `ldb`
* `BlockBasedTableOptions::detect_filter_construct_corruption` can now be dynamically configured using `DB::SetOptions`.
+* Automatically recover from retryable read IO errors during backgorund flush/compaction.
### Bug Fixes
* Fixed a data race on `versions_` between `DBImpl::ResumeImpl()` and threads waiting for recovery to complete (#9496)
@@ -18,6 +19,7 @@
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
* Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed.
* Fixed a race condition when disable and re-enable manual compaction.
+* Fixed automatic error recovery failure in atomic flush.
### Public API changes
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
diff --git a/db/builder.cc b/db/builder.cc
index 48f338546..1bcd6afc1 100644
--- a/db/builder.cc
+++ b/db/builder.cc
@@ -318,6 +318,7 @@ Status BuildTable(
// TODO Also check the IO status when create the Iterator.
+ TEST_SYNC_POINT("BuildTable:BeforeOutputValidation");
if (s.ok() && !empty) {
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc
index ab7dc10e3..e22aad567 100644
--- a/db/db_impl/db_impl_compaction_flush.cc
+++ b/db/db_impl/db_impl_compaction_flush.cc
@@ -263,11 +263,6 @@ Status DBImpl::FlushMemTableToOutputFile(
if (!s.ok() && need_cancel) {
flush_job.Cancel();
}
- IOStatus io_s = IOStatus::OK();
- io_s = flush_job.io_status();
- if (s.ok()) {
- s = io_s;
- }
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
@@ -303,9 +298,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
- if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
- !io_s.IsColumnFamilyDropped()) {
- assert(log_io_s.ok());
+ if (log_io_s.ok()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
@@ -316,24 +309,19 @@ Status DBImpl::FlushMemTableToOutputFile(
// error), all the Manifest write will be map to soft error.
// TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor is
// needed.
- error_handler_.SetBGError(io_s,
+ error_handler_.SetBGError(s,
BackgroundErrorReason::kManifestWriteNoWAL);
} else {
// If WAL sync is successful (either WAL size is 0 or there is no IO
// error), all the other SST file write errors will be set as
// kFlushNoWAL.
- error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
+ error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
}
} else {
- if (log_io_s.ok()) {
- Status new_bg_error = s;
- error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
- }
+ assert(s == log_io_s);
+ Status new_bg_error = s;
+ error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
- } else {
- // If we got here, then we decided not to care about the i_os status (either
- // from never needing it or ignoring the flush job status
- io_s.PermitUncheckedError();
}
// If flush ran smoothly and no mempurge happened
// install new SST file path.
@@ -416,6 +404,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
for (const auto cfd : cfds) {
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
+ assert(cfd->GetFlushReason() == cfds[0]->GetFlushReason());
}
#endif /* !NDEBUG */
@@ -502,12 +491,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// exec_status stores the execution status of flush_jobs as
//
autovector> exec_status;
- autovector io_status;
std::vector pick_status;
for (int i = 0; i != num_cfs; ++i) {
// Initially all jobs are not executed, with status OK.
exec_status.emplace_back(false, Status::OK());
- io_status.emplace_back(IOStatus::OK());
pick_status.push_back(false);
}
@@ -527,7 +514,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i],
&(switched_to_mempurge.at(i)));
exec_status[i].first = true;
- io_status[i] = jobs[i]->io_status();
}
if (num_cfs > 1) {
TEST_SYNC_POINT(
@@ -541,7 +527,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
&logs_with_prep_tracker_, file_meta.data() /* &file_meta[0] */,
switched_to_mempurge.empty() ? nullptr : &(switched_to_mempurge.at(0)));
exec_status[0].first = true;
- io_status[0] = jobs[0]->io_status();
Status error_status;
for (const auto& e : exec_status) {
@@ -560,21 +545,6 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = error_status.ok() ? s : error_status;
}
- IOStatus io_s = IOStatus::OK();
- if (io_s.ok()) {
- IOStatus io_error = IOStatus::OK();
- for (int i = 0; i != static_cast(io_status.size()); i++) {
- if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
- !io_status[i].IsColumnFamilyDropped()) {
- io_error = io_status[i];
- }
- }
- io_s = io_error;
- if (s.ok() && !io_s.ok()) {
- s = io_s;
- }
- }
-
if (s.IsColumnFamilyDropped()) {
s = Status::OK();
}
@@ -647,7 +617,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
return std::make_pair(Status::OK(), !ready);
};
- bool resuming_from_bg_err = error_handler_.IsDBStopped();
+ bool resuming_from_bg_err =
+ error_handler_.IsDBStopped() ||
+ (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
+ cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
std::pair res = wait_to_install_func();
@@ -662,7 +635,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
atomic_flush_install_cv_.Wait();
- resuming_from_bg_err = error_handler_.IsDBStopped();
+ resuming_from_bg_err =
+ error_handler_.IsDBStopped() ||
+ (cfds[0]->GetFlushReason() == FlushReason::kErrorRecovery ||
+ cfds[0]->GetFlushReason() == FlushReason::kErrorRecoveryRetryFlush);
}
if (!resuming_from_bg_err) {
@@ -786,8 +762,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
// it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) {
- if (!io_s.ok() && !io_s.IsColumnFamilyDropped()) {
- assert(log_io_s.ok());
+ if (log_io_s.ok()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
// CURRENT file. With current code, it's just difficult to tell. So just
@@ -798,19 +773,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// error), all the Manifest write will be map to soft error.
// TODO: kManifestWriteNoWAL and kFlushNoWAL are misleading. Refactor
// is needed.
- error_handler_.SetBGError(io_s,
+ error_handler_.SetBGError(s,
BackgroundErrorReason::kManifestWriteNoWAL);
} else {
// If WAL sync is successful (either WAL size is 0 or there is no IO
// error), all the other SST file write errors will be set as
// kFlushNoWAL.
- error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
+ error_handler_.SetBGError(s, BackgroundErrorReason::kFlushNoWAL);
}
} else {
- if (log_io_s.ok()) {
- Status new_bg_error = s;
- error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
- }
+ assert(s == log_io_s);
+ Status new_bg_error = s;
+ error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
diff --git a/db/error_handler.cc b/db/error_handler.cc
index 5b5b08f75..b00611ac9 100644
--- a/db/error_handler.cc
+++ b/db/error_handler.cc
@@ -272,8 +272,8 @@ STATIC_AVOID_DESTRUCTION(const Status, kOkStatus){Status::OK()};
// This can also get called as part of a recovery operation. In that case, we
// also track the error separately in recovery_error_ so we can tell in the
// end whether recovery succeeded or not
-const Status& ErrorHandler::SetBGError(const Status& bg_err,
- BackgroundErrorReason reason) {
+const Status& ErrorHandler::HandleKnownErrors(const Status& bg_err,
+ BackgroundErrorReason reason) {
db_mutex_->AssertHeld();
if (bg_err.ok()) {
return kOkStatus;
@@ -382,9 +382,12 @@ const Status& ErrorHandler::SetBGError(const Status& bg_err,
// c) all other errors are mapped to hard error.
// 3) for other cases, SetBGError(const Status& bg_err, BackgroundErrorReason
// reason) will be called to handle other error cases.
-const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
+const Status& ErrorHandler::SetBGError(const Status& bg_status,
BackgroundErrorReason reason) {
db_mutex_->AssertHeld();
+ Status tmp_status = bg_status;
+ IOStatus bg_io_err = status_to_io_status(std::move(tmp_status));
+
if (bg_io_err.ok()) {
return kOkStatus;
}
@@ -483,7 +486,11 @@ const Status& ErrorHandler::SetBGError(const IOStatus& bg_io_err,
if (bg_error_stats_ != nullptr) {
RecordTick(bg_error_stats_.get(), ERROR_HANDLER_BG_IO_ERROR_COUNT);
}
- return SetBGError(new_bg_io_err, reason);
+ // HandleKnownErrors() will use recovery_error_, so ignore
+ // recovery_io_error_.
+ // TODO: Do some refactoring and use only one recovery_error_
+ recovery_io_error_.PermitUncheckedError();
+ return HandleKnownErrors(new_bg_io_err, reason);
}
}
diff --git a/db/error_handler.h b/db/error_handler.h
index 3e05ee6cc..9dab56e91 100644
--- a/db/error_handler.h
+++ b/db/error_handler.h
@@ -53,9 +53,6 @@ class ErrorHandler {
const Status& SetBGError(const Status& bg_err, BackgroundErrorReason reason);
- const Status& SetBGError(const IOStatus& bg_io_err,
- BackgroundErrorReason reason);
-
Status GetBGError() const { return bg_error_; }
Status GetRecoveryError() const { return recovery_error_; }
@@ -112,6 +109,8 @@ class ErrorHandler {
// The pointer of DB statistics.
std::shared_ptr bg_error_stats_;
+ const Status& HandleKnownErrors(const Status& bg_err,
+ BackgroundErrorReason reason);
Status OverrideNoSpaceError(const Status& bg_error, bool* auto_recovery);
void RecoverFromNoSpace();
const Status& StartRecoverFromRetryableBGIOError(const IOStatus& io_error);
diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc
index c01273819..547d08732 100644
--- a/db/error_handler_fs_test.cc
+++ b/db/error_handler_fs_test.cc
@@ -2468,6 +2468,210 @@ TEST_F(DBErrorHandlingFSTest, FLushWritRetryableErrorAbortRecovery) {
Destroy(options);
}
+TEST_F(DBErrorHandlingFSTest, FlushReadError) {
+ std::shared_ptr listener =
+ std::make_shared();
+ Options options = GetDefaultOptions();
+ options.env = fault_env_.get();
+ options.create_if_missing = true;
+ options.listeners.emplace_back(listener);
+ options.statistics = CreateDBStatistics();
+ Status s;
+
+ listener->EnableAutoRecovery(false);
+ DestroyAndReopen(options);
+
+ ASSERT_OK(Put(Key(0), "val"));
+ SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable:BeforeOutputValidation", [&](void*) {
+ IOStatus st = IOStatus::IOError();
+ st.SetRetryable(true);
+ st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
+ fault_fs_->SetFilesystemActive(false, st);
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable:BeforeDeleteFile",
+ [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
+ SyncPoint::GetInstance()->EnableProcessing();
+ s = Flush();
+ ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+ SyncPoint::GetInstance()->DisableProcessing();
+ fault_fs_->SetFilesystemActive(true);
+ ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+ ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_ERROR_COUNT));
+ ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_IO_ERROR_COUNT));
+ ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+ ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_AUTORESUME_COUNT));
+ ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+ s = dbfull()->TEST_GetBGError();
+ ASSERT_OK(s);
+
+ Reopen(GetDefaultOptions());
+ ASSERT_EQ("val", Get(Key(0)));
+}
+
+TEST_F(DBErrorHandlingFSTest, AtomicFlushReadError) {
+ std::shared_ptr listener =
+ std::make_shared();
+ Options options = GetDefaultOptions();
+ options.env = fault_env_.get();
+ options.create_if_missing = true;
+ options.listeners.emplace_back(listener);
+ options.statistics = CreateDBStatistics();
+ Status s;
+
+ listener->EnableAutoRecovery(false);
+ options.atomic_flush = true;
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ ASSERT_OK(Put(0, Key(0), "val"));
+ ASSERT_OK(Put(1, Key(0), "val"));
+ SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable:BeforeOutputValidation", [&](void*) {
+ IOStatus st = IOStatus::IOError();
+ st.SetRetryable(true);
+ st.SetScope(IOStatus::IOErrorScope::kIOErrorScopeFile);
+ fault_fs_->SetFilesystemActive(false, st);
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable:BeforeDeleteFile",
+ [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
+ SyncPoint::GetInstance()->EnableProcessing();
+ s = Flush({0, 1});
+ ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kSoftError);
+ SyncPoint::GetInstance()->DisableProcessing();
+ fault_fs_->SetFilesystemActive(true);
+ ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+ ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_ERROR_COUNT));
+ ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_IO_ERROR_COUNT));
+ ASSERT_EQ(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_RETRYABLE_IO_ERROR_COUNT));
+ ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_AUTORESUME_COUNT));
+ ASSERT_LE(0, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_AUTORESUME_RETRY_TOTAL_COUNT));
+ s = dbfull()->TEST_GetBGError();
+ ASSERT_OK(s);
+
+ TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
+ GetDefaultOptions());
+ ASSERT_EQ("val", Get(Key(0)));
+}
+
+TEST_F(DBErrorHandlingFSTest, AtomicFlushNoSpaceError) {
+ std::shared_ptr listener =
+ std::make_shared();
+ Options options = GetDefaultOptions();
+ options.env = fault_env_.get();
+ options.create_if_missing = true;
+ options.listeners.emplace_back(listener);
+ options.statistics = CreateDBStatistics();
+ Status s;
+
+ listener->EnableAutoRecovery(true);
+ options.atomic_flush = true;
+ CreateAndReopenWithCF({"pikachu"}, options);
+
+ ASSERT_OK(Put(0, Key(0), "val"));
+ ASSERT_OK(Put(1, Key(0), "val"));
+ SyncPoint::GetInstance()->SetCallBack("BuildTable:create_file", [&](void*) {
+ IOStatus st = IOStatus::NoSpace();
+ fault_fs_->SetFilesystemActive(false, st);
+ });
+ SyncPoint::GetInstance()->SetCallBack(
+ "BuildTable:BeforeDeleteFile",
+ [&](void*) { fault_fs_->SetFilesystemActive(true, IOStatus::OK()); });
+ SyncPoint::GetInstance()->EnableProcessing();
+ s = Flush({0, 1});
+ ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
+ SyncPoint::GetInstance()->DisableProcessing();
+ fault_fs_->SetFilesystemActive(true);
+ ASSERT_EQ(listener->WaitForRecovery(5000000), true);
+ ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_ERROR_COUNT));
+ ASSERT_LE(1, options.statistics->getAndResetTickerCount(
+ ERROR_HANDLER_BG_IO_ERROR_COUNT));
+ s = dbfull()->TEST_GetBGError();
+ ASSERT_OK(s);
+
+ TryReopenWithColumnFamilies({kDefaultColumnFamilyName, "pikachu"},
+ GetDefaultOptions());
+ ASSERT_EQ("val", Get(Key(0)));
+}
+
+TEST_F(DBErrorHandlingFSTest, CompactionReadRetryableErrorAutoRecover) {
+ // In this test, in the first round of compaction, the FS is set to error.
+ // So the first compaction fails due to retryable IO error and it is mapped
+ // to soft error. Then, compaction is rescheduled, in the second round of
+ // compaction, the FS is set to active and compaction is successful, so
+ // the test will hit the CompactionJob::FinishCompactionOutputFile1 sync
+ // point.
+ std::shared_ptr listener =
+ std::make_shared();
+ Options options = GetDefaultOptions();
+ options.env = fault_env_.get();
+ options.create_if_missing = true;
+ options.level0_file_num_compaction_trigger = 2;
+ options.listeners.emplace_back(listener);
+ BlockBasedTableOptions table_options;
+ table_options.no_block_cache = true;
+ options.table_factory.reset(NewBlockBasedTableFactory(table_options));
+ Status s;
+ std::atomic fail_first(false);
+ std::atomic fail_second(true);
+ Random rnd(301);
+ DestroyAndReopen(options);
+
+ IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
+ error_msg.SetRetryable(true);
+
+ for (int i = 0; i < 100; ++i) {
+ ASSERT_OK(Put(Key(i), rnd.RandomString(1024)));
+ }
+ s = Flush();
+ ASSERT_OK(s);
+
+ listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
+ listener->EnableAutoRecovery(false);
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"},
+ {"CompactionJob::FinishCompactionOutputFile1",
+ "CompactionWriteRetryableErrorAutoRecover0"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:Start",
+ [&](void*) { fault_fs_->SetFilesystemActive(true); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BackgroundCallCompaction:0", [&](void*) { fail_first.store(true); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::Run():PausingManualCompaction:2", [&](void*) {
+ if (fail_first.load() && fail_second.load()) {
+ fault_fs_->SetFilesystemActive(false, error_msg);
+ fail_second.store(false);
+ }
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_OK(Put(Key(1), "val"));
+ s = Flush();
+ ASSERT_OK(s);
+
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(s);
+ TEST_SYNC_POINT("CompactionWriteRetryableErrorAutoRecover0");
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->DisableProcessing();
+
+ Reopen(GetDefaultOptions());
+}
+
class DBErrorHandlingFencingTest : public DBErrorHandlingFSTest,
public testing::WithParamInterface {};
diff --git a/db/flush_job.cc b/db/flush_job.cc
index 85359a6f4..8f9160354 100644
--- a/db/flush_job.cc
+++ b/db/flush_job.cc
@@ -136,7 +136,6 @@ FlushJob::FlushJob(
}
FlushJob::~FlushJob() {
- io_status_.PermitUncheckedError();
ThreadStatusUtil::ResetThreadStatus();
}
@@ -290,17 +289,13 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
} else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
- IOStatus tmp_io_s;
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
- log_buffer_, &committed_flush_jobs_info_, &tmp_io_s,
+ log_buffer_, &committed_flush_jobs_info_,
!(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
but 'false' if mempurge successful: no new min log number
or new level 0 file path to write to manifest. */);
- if (!tmp_io_s.ok()) {
- io_status_ = tmp_io_s;
- }
}
if (s.ok() && file_meta != nullptr) {
@@ -926,9 +921,9 @@ Status FlushJob::WriteLevel0Table() {
job_context_->job_id, Env::IO_HIGH, &table_properties_, write_hint,
full_history_ts_low, blob_callback_, &num_input_entries,
&memtable_payload_bytes, &memtable_garbage_bytes);
- if (!io_s.ok()) {
- io_status_ = io_s;
- }
+ // TODO: Cleanup io_status in BuildTable and table builders
+ assert(!s.ok() || io_s.ok());
+ io_s.PermitUncheckedError();
if (num_input_entries != total_num_entries && s.ok()) {
std::string msg = "Expected " + ToString(total_num_entries) +
" entries in memtables, but read " +
diff --git a/db/flush_job.h b/db/flush_job.h
index 5c5b0c47e..76d5e34b6 100644
--- a/db/flush_job.h
+++ b/db/flush_job.h
@@ -93,9 +93,6 @@ class FlushJob {
}
#endif // !ROCKSDB_LITE
- // Return the IO status
- IOStatus io_status() const { return io_status_; }
-
private:
void ReportStartedFlush();
void ReportFlushInputSize(const autovector& mems);
@@ -184,7 +181,6 @@ class FlushJob {
Version* base_;
bool pick_memtable_called;
Env::Priority thread_pri_;
- IOStatus io_status_;
const std::shared_ptr io_tracer_;
SystemClock* clock_;
diff --git a/db/memtable_list.cc b/db/memtable_list.cc
index 857147968..133562b7b 100644
--- a/db/memtable_list.cc
+++ b/db/memtable_list.cc
@@ -407,7 +407,7 @@ Status MemTableList::TryInstallMemtableFlushResults(
autovector* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
std::list>* committed_flush_jobs_info,
- IOStatus* io_s, bool write_edits) {
+ bool write_edits) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();
@@ -529,7 +529,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
db_directory, /*new_descriptor_log=*/false,
/*column_family_options=*/nullptr,
manifest_write_cb);
- *io_s = vset->io_status();
} else {
// If write_edit is false (e.g: successful mempurge),
// then remove old memtables, wake up manifest write queue threads,
@@ -545,7 +544,6 @@ Status MemTableList::TryInstallMemtableFlushResults(
// TODO(bjlemaire): explain full reason WakeUpWaitingManifestWriters
// needed or investigate more.
vset->WakeUpWaitingManifestWriters();
- *io_s = IOStatus::OK();
}
}
}
diff --git a/db/memtable_list.h b/db/memtable_list.h
index a3e604a6a..6df0e7c02 100644
--- a/db/memtable_list.h
+++ b/db/memtable_list.h
@@ -270,7 +270,7 @@ class MemTableList {
autovector* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
std::list>* committed_flush_jobs_info,
- IOStatus* io_s, bool write_edits = true);
+ bool write_edits = true);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().
diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc
index e801ac04f..df1694c21 100644
--- a/db/memtable_list_test.cc
+++ b/db/memtable_list_test.cc
@@ -124,7 +124,7 @@ class MemTableListTest : public testing::Test {
std::list> flush_jobs_info;
Status s = list->TryInstallMemtableFlushResults(
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
- file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
+ file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
EXPECT_OK(io_s);
return s;
}