Pass IOStatus to write path and set retryable IO Error as hard error in BG jobs (#6487)

Summary:
In the current code base, we use Status to get and store the returned status from the call. Specifically, for IO related functions, the current Status cannot reflect the IO Error details such as error scope, error retryable attribute, and others. With the implementation of https://github.com/facebook/rocksdb/issues/5761, we have the new Wrapper for IO, which returns IOStatus instead of Status. However, the IOStatus is purged at the lower level of write path and transferred to Status.

The first job of this PR is to pass the IOStatus to the write path (flush, WAL write, and Compaction). The second job is to identify the Retryable IO Error as HardError, and set the bg_error_ as HardError. In this case, the DB Instance becomes read only. User is informed of the Status and need to take actions to deal with it (e.g., call db->Resume()).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6487

Test Plan: Added the testing case to error_handler_fs_test. Pass make asan_check

Reviewed By: anand1976

Differential Revision: D20685017

Pulled By: zhichao-cao

fbshipit-source-id: ff85f042896243abcd6ef37877834e26f36b6eb0
main
Zhichao Cao 5 years ago committed by Facebook GitHub Bot
parent 2e276973e4
commit 4246888101
  1. 22
      db/builder.cc
  2. 2
      db/builder.h
  3. 26
      db/compaction/compaction_job.cc
  4. 4
      db/compaction/compaction_job.h
  5. 2
      db/compaction/compaction_job_test.cc
  6. 34
      db/db_impl/db_impl.cc
  7. 24
      db/db_impl/db_impl.h
  8. 91
      db/db_impl/db_impl_compaction_flush.cc
  9. 5
      db/db_impl/db_impl_open.cc
  10. 104
      db/db_impl/db_impl_write.cc
  11. 40
      db/error_handler.cc
  12. 3
      db/error_handler.h
  13. 305
      db/error_handler_fs_test.cc
  14. 14
      db/flush_job.cc
  15. 4
      db/flush_job.h
  16. 2
      db/flush_job_test.cc
  17. 14
      db/log_writer.cc
  18. 9
      db/log_writer.h
  19. 5
      db/memtable_list.cc
  20. 3
      db/memtable_list.h
  21. 3
      db/memtable_list_test.cc
  22. 3
      db/repair.cc
  23. 42
      db/version_set.cc
  24. 9
      db/version_set.h
  25. 30
      db/version_set_test.cc
  26. 16
      env/composite_env_wrapper.h
  27. 16
      env/env.cc
  28. 25
      env/file_system.cc
  29. 12
      file/filename.cc
  30. 6
      file/filename.h
  31. 46
      file/writable_file_writer.cc
  32. 27
      file/writable_file_writer.h
  33. 9
      include/rocksdb/file_system.h
  34. 15
      include/rocksdb/io_status.h
  35. 27
      table/block_based/block_based_table_builder.cc
  36. 3
      table/block_based/block_based_table_builder.h
  37. 38
      table/cuckoo/cuckoo_table_builder.cc
  38. 4
      table/cuckoo/cuckoo_table_builder.h
  39. 4
      table/mock_table.h
  40. 95
      table/plain/plain_table_builder.cc
  41. 6
      table/plain/plain_table_builder.h
  42. 41
      table/plain/plain_table_key_coding.cc
  43. 5
      table/plain/plain_table_key_coding.h
  44. 3
      table/table_builder.h

@ -81,10 +81,11 @@ Status BuildTable(
SnapshotChecker* snapshot_checker, const CompressionType compression,
uint64_t sample_for_compression, const CompressionOptions& compression_opts,
bool paranoid_file_checks, InternalStats* internal_stats,
TableFileCreationReason reason, EventLogger* event_logger, int job_id,
const Env::IOPriority io_priority, TableProperties* table_properties,
int level, const uint64_t creation_time, const uint64_t oldest_key_time,
Env::WriteLifeTimeHint write_hint, const uint64_t file_creation_time) {
TableFileCreationReason reason, IOStatus* io_status,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint,
const uint64_t file_creation_time) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
@ -185,11 +186,13 @@ Status BuildTable(
tp = builder->GetTableProperties();
bool empty = builder->NumEntries() == 0 && tp.num_range_deletions == 0;
s = c_iter.status();
TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable");
if (!s.ok() || empty) {
builder->Abandon();
} else {
s = builder->Finish();
}
*io_status = builder->io_status();
if (s.ok() && !empty) {
uint64_t file_size = builder->FileSize();
@ -209,11 +212,16 @@ Status BuildTable(
// Finish and check for file errors
if (s.ok() && !empty) {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
s = file_writer->Sync(ioptions.use_fsync);
*io_status = file_writer->Sync(ioptions.use_fsync);
}
if (s.ok() && !empty) {
s = file_writer->Close();
if (io_status->ok() && !empty) {
*io_status = file_writer->Close();
}
if (!io_status->ok()) {
s = *io_status;
}
// TODO Also check the IO status when create the Iterator.
if (s.ok() && !empty) {
// Verify that the table is usable

@ -78,7 +78,7 @@ extern Status BuildTable(
const uint64_t sample_for_compression,
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger = nullptr, int job_id = 0,
IOStatus* io_status, EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0,

@ -613,8 +613,13 @@ Status CompactionJob::Run() {
}
}
IOStatus io_s;
if (status.ok() && output_directory_) {
status = output_directory_->Fsync(IOOptions(), nullptr);
io_s = output_directory_->Fsync(IOOptions(), nullptr);
}
if (!io_s.ok()) {
io_status_ = io_s;
status = io_s;
}
if (status.ok()) {
@ -713,9 +718,13 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
versions_->SetIOStatusOK();
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
}
if (!versions_->io_status().ok()) {
io_status_ = versions_->io_status();
}
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_;
@ -1294,6 +1303,10 @@ Status CompactionJob::FinishCompactionOutputFile(
} else {
sub_compact->builder->Abandon();
}
if (!sub_compact->builder->io_status().ok()) {
io_status_ = sub_compact->builder->io_status();
s = io_status_;
}
const uint64_t current_bytes = sub_compact->builder->FileSize();
if (s.ok()) {
// Add the checksum information to file metadata.
@ -1307,12 +1320,17 @@ Status CompactionJob::FinishCompactionOutputFile(
sub_compact->total_bytes += current_bytes;
// Finish and check for file errors
IOStatus io_s;
if (s.ok()) {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
s = sub_compact->outfile->Sync(db_options_.use_fsync);
io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
}
if (s.ok()) {
s = sub_compact->outfile->Close();
if (io_s.ok()) {
io_s = sub_compact->outfile->Close();
}
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
}
sub_compact->outfile.reset();

@ -100,6 +100,9 @@ class CompactionJob {
// Add compaction input/output to the current version
Status Install(const MutableCFOptions& mutable_cf_options);
// Return the IO status
IOStatus io_status() const { return io_status_; }
private:
struct SubcompactionState;
@ -193,6 +196,7 @@ class CompactionJob {
std::vector<uint64_t> sizes_;
Env::WriteLifeTimeHint write_hint_;
Env::Priority thread_pri_;
IOStatus io_status_;
};
} // namespace ROCKSDB_NAMESPACE

@ -281,7 +281,7 @@ class CompactionJobTest : public testing::Test {
}
ASSERT_OK(s);
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, nullptr);
s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
std::vector<ColumnFamilyDescriptor> column_families;
cf_options_.table_factory = mock_table_factory_;

@ -311,6 +311,11 @@ Status DBImpl::ResumeImpl() {
s = bg_error;
}
// Make sure the IO Status stored in version set is set to OK.
if(s.ok()) {
versions_->SetIOStatusOK();
}
// We cannot guarantee consistency of the WAL. So force flush Memtables of
// all the column families
if (s.ok()) {
@ -1146,25 +1151,25 @@ int DBImpl::FindMinimumEmptyLevelFitting(
Status DBImpl::FlushWAL(bool sync) {
if (manual_wal_flush_) {
Status s;
IOStatus io_s;
{
// We need to lock log_write_mutex_ since logs_ might change concurrently
InstrumentedMutexLock wl(&log_write_mutex_);
log::Writer* cur_log_writer = logs_.back().writer;
s = cur_log_writer->WriteBuffer();
io_s = cur_log_writer->WriteBuffer();
}
if (!s.ok()) {
if (!io_s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
s.ToString().c_str());
io_s.ToString().c_str());
// In case there is a fs error we should set it globally to prevent the
// future writes
WriteStatusCheck(s);
IOStatusCheck(io_s);
// whether sync or not, we should abort the rest of function upon error
return s;
return std::move(io_s);
}
if (!sync) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
return s;
return std::move(io_s);
}
}
if (!sync) {
@ -1216,12 +1221,21 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
RecordTick(stats_, WAL_FILE_SYNCED);
Status status;
IOStatus io_s;
for (log::Writer* log : logs_to_sync) {
status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
if (!status.ok()) {
io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync);
if (!io_s.ok()) {
status = io_s;
break;
}
}
if (!io_s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s",
io_s.ToString().c_str());
// In case there is a fs error we should set it globally to prevent the
// future writes
IOStatusCheck(io_s);
}
if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
}
@ -1248,7 +1262,7 @@ Status DBImpl::LockWAL() {
// future writes
WriteStatusCheck(status);
}
return status;
return std::move(status);
}
Status DBImpl::UnlockWAL() {

@ -1364,7 +1364,7 @@ class DBImpl : public DB {
void ReleaseFileNumberFromPendingOutputs(
std::unique_ptr<std::list<uint64_t>::iterator>& v);
Status SyncClosedLogs(JobContext* job_context);
IOStatus SyncClosedLogs(JobContext* job_context);
// Flush the in-memory write buffer to storage. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. Then
@ -1501,21 +1501,25 @@ class DBImpl : public DB {
WriteBatch* tmp_batch, size_t* write_with_wal,
WriteBatch** to_be_cached_state);
Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size);
IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size);
Status WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence);
IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence);
Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used, SequenceNumber* last_sequence,
size_t seq_inc);
IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used,
SequenceNumber* last_sequence, size_t seq_inc);
// Used by WriteImpl to update bg_error_ if paranoid check is enabled.
void WriteStatusCheck(const Status& status);
// Used by WriteImpl to update bg_error_ when IO error happens, e.g., write
// WAL, sync WAL fails, if paranoid check is enabled.
void IOStatusCheck(const IOStatus& status);
// Used by WriteImpl to update bg_error_ in case of memtable insert error.
void MemTableInsertStatusCheck(const Status& memtable_insert_status);

@ -79,7 +79,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force,
return false;
}
Status DBImpl::SyncClosedLogs(JobContext* job_context) {
IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start");
mutex_.AssertHeld();
autovector<log::Writer*, 1> logs_to_sync;
@ -96,7 +96,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
logs_to_sync.push_back(log.writer);
}
Status s;
IOStatus io_s;
if (!logs_to_sync.empty()) {
mutex_.Unlock();
@ -104,34 +104,34 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number());
s = log->file()->Sync(immutable_db_options_.use_fsync);
if (!s.ok()) {
io_s = log->file()->Sync(immutable_db_options_.use_fsync);
if (!io_s.ok()) {
break;
}
if (immutable_db_options_.recycle_log_file_num > 0) {
s = log->Close();
if (!s.ok()) {
io_s = log->Close();
if (!io_s.ok()) {
break;
}
}
}
if (s.ok()) {
s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
if (io_s.ok()) {
io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
}
mutex_.Lock();
// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, s);
if (!s.ok()) {
error_handler_.SetBGError(s, BackgroundErrorReason::kFlush);
MarkLogsSynced(current_log_number - 1, true, io_s);
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return s;
return io_s;
}
}
return s;
return io_s;
}
Status DBImpl::FlushMemTableToOutputFile(
@ -155,7 +155,6 @@ Status DBImpl::FlushMemTableToOutputFile(
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats,
true /* sync_output_directory */, true /* write_manifest */, thread_pri);
FileMetaData file_meta;
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
@ -168,6 +167,7 @@ Status DBImpl::FlushMemTableToOutputFile(
#endif // ROCKSDB_LITE
Status s;
IOStatus io_s;
if (logfile_number_ > 0 &&
versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) {
// If there are more than one column families, we need to make sure that
@ -176,7 +176,8 @@ Status DBImpl::FlushMemTableToOutputFile(
// flushed SST may contain data from write batches whose updates to
// other column families are missing.
// SyncClosedLogs() may unlock and re-lock the db_mutex.
s = SyncClosedLogs(job_context);
io_s = SyncClosedLogs(job_context);
s = io_s;
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
}
@ -192,6 +193,7 @@ Status DBImpl::FlushMemTableToOutputFile(
} else {
flush_job.Cancel();
}
io_s = flush_job.io_status();
if (s.ok()) {
InstallSuperVersionAndScheduleWork(cfd, superversion_context,
@ -206,8 +208,12 @@ Status DBImpl::FlushMemTableToOutputFile(
}
if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
if (!io_s.ok()&& !io_s.IsShutdownInProgress() && !io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
if (s.ok()) {
#ifndef ROCKSDB_LITE
@ -344,6 +350,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
std::vector<FileMetaData> file_meta(num_cfs);
Status s;
IOStatus io_s;
assert(num_cfs == static_cast<int>(jobs.size()));
#ifndef ROCKSDB_LITE
@ -358,15 +365,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
if (logfile_number_ > 0) {
// TODO (yanqin) investigate whether we should sync the closed logs for
// single column family case.
s = SyncClosedLogs(job_context);
io_s = SyncClosedLogs(job_context);
s = io_s;
}
// exec_status stores the execution status of flush_jobs as
// <bool /* executed */, Status /* status code */>
autovector<std::pair<bool, Status>> exec_status;
autovector<IOStatus> io_status;
for (int i = 0; i != num_cfs; ++i) {
// Initially all jobs are not executed, with status OK.
exec_status.emplace_back(false, Status::OK());
io_status.emplace_back(IOStatus::OK());
}
if (s.ok()) {
@ -375,6 +385,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
exec_status[i].second =
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true;
io_status[i] = jobs[i]->io_status();
}
if (num_cfs > 1) {
TEST_SYNC_POINT(
@ -387,6 +398,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
exec_status[0].second =
jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true;
io_status[0] = jobs[0]->io_status();
Status error_status;
for (const auto& e : exec_status) {
@ -405,6 +417,20 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = error_status.ok() ? s : error_status;
}
if (io_s.ok()) {
IOStatus io_error = IOStatus::OK();
for (int i = 0; i != static_cast<int>(io_status.size()); i++) {
if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() &&
!io_status[i].IsColumnFamilyDropped()) {
io_error = io_status[i];
}
}
io_s = io_error;
if (s.ok() && !io_s.ok()) {
s = io_s;
}
}
if (s.IsColumnFamilyDropped()) {
s = Status::OK();
}
@ -543,9 +569,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
#endif // ROCKSDB_LITE
}
if (!s.ok() && !s.IsShutdownInProgress()) {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
// it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) {
if (!io_s.ok() && io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
}
return s;
@ -2633,6 +2665,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
}
IOStatus io_s;
if (!c) {
// Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do");
@ -2654,9 +2687,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (const auto& f : *c->inputs(0)) {
c->edit()->DeleteFile(c->level(), f->fd.GetNumber());
}
versions_->SetIOStatusOK();
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
io_s = versions_->io_status();
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
*c->mutable_cf_options());
@ -2710,9 +2745,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
}
}
versions_->SetIOStatusOK();
status = versions_->LogAndApply(c->column_family_data(),
*c->mutable_cf_options(), c->edit(),
&mutex_, directories_.GetDbDir());
io_s = versions_->io_status();
// Use latest MutableCFOptions
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
@ -2799,6 +2836,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
mutex_.Lock();
status = compaction_job.Install(*c->mutable_cf_options());
io_s = compaction_job.io_status();
if (status.ok()) {
InstallSuperVersionAndScheduleWork(c->column_family_data(),
&job_context->superversion_contexts[0],
@ -2808,6 +2846,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction",
c->column_family_data());
}
if (status.ok() && !io_s.ok()) {
status = io_s;
}
if (c != nullptr) {
c->ReleaseCompactionFiles(status);
*made_progress = true;
@ -2833,7 +2876,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
} else {
ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s",
status.ToString().c_str());
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
}
if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
// Put this cfd back in the compaction queue so we can retry after some
// time

@ -292,7 +292,7 @@ Status DBImpl::NewDB() {
}
if (s.ok()) {
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir());
s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir());
} else {
fs_->DeleteFile(manifest, IOOptions(), nullptr);
}
@ -1239,6 +1239,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
IOStatus io_s;
s = BuildTable(
dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options,
file_options_for_compaction_, cfd->table_cache(), iter.get(),
@ -1248,7 +1249,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
mutable_cf_options.sample_for_compression,
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,
cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);
LogFlush(immutable_db_options_.info_log);

@ -101,6 +101,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
disable_memtable);
Status status;
IOStatus io_s;
if (write_options.low_pri) {
status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
if (!status.ok()) {
@ -322,21 +323,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (!two_write_queues_) {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, last_sequence + 1);
}
} else {
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastAllocatedSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
seq_inc);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
}
}
status = io_s;
assert(last_sequence != kMaxSequenceNumber);
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += seq_inc;
@ -411,7 +413,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
WriteStatusCheck(status);
if (!io_s.ok()) {
IOStatusCheck(io_s);
} else {
WriteStatusCheck(status);
}
}
if (need_log_sync) {
@ -515,6 +521,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
PERF_TIMER_STOP(write_pre_and_post_process_time);
IOStatus io_s;
if (w.status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
@ -524,12 +531,17 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_sync, need_log_dir_sync, current_sequence);
io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, current_sequence);
w.status = io_s;
}
if (!w.CallbackFailed()) {
WriteStatusCheck(w.status);
if (!io_s.ok()) {
IOStatusCheck(io_s);
} else {
WriteStatusCheck(w.status);
}
}
if (need_log_sync) {
@ -740,9 +752,10 @@ Status DBImpl::WriteImplWALOnly(
}
seq_inc = total_batch_cnt;
}
IOStatus io_s;
if (!write_options.disableWAL) {
status =
ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);
status = io_s;
} else {
// Otherwise we inc seq number to do solely the seq allocation
last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc);
@ -777,7 +790,11 @@ Status DBImpl::WriteImplWALOnly(
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
WriteStatusCheck(status);
if (!io_s.ok()) {
IOStatusCheck(io_s);
} else {
WriteStatusCheck(status);
}
}
if (status.ok()) {
size_t index = 0;
@ -823,6 +840,17 @@ void DBImpl::WriteStatusCheck(const Status& status) {
}
}
void DBImpl::IOStatusCheck(const IOStatus& io_status) {
// Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes.
if (immutable_db_options_.paranoid_checks && !io_status.ok() &&
!io_status.IsBusy() && !io_status.IsIncomplete()) {
mutex_.Lock();
error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
mutex_.Unlock();
}
}
void DBImpl::MemTableInsertStatusCheck(const Status& status) {
// A non-OK status here indicates that the state implied by the
// WAL has diverged from the in-memory state. This could be
@ -961,9 +989,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
// When two_write_queues_ is disabled, this function is called from the only
// write thread. Otherwise this must be called holding log_write_mutex_.
Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size) {
IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
uint64_t* log_size) {
assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size();
@ -978,7 +1006,8 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
if (UNLIKELY(needs_locking)) {
log_write_mutex_.Lock();
}
Status status = log_writer->AddRecord(log_entry);
IOStatus io_s = log_writer->AddRecord(log_entry);
if (UNLIKELY(needs_locking)) {
log_write_mutex_.Unlock();
}
@ -990,15 +1019,14 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
// since alive_log_files_ might be modified concurrently
alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false;
return status;
return io_s;
}
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
Status status;
IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
IOStatus io_s;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
size_t write_with_wal = 0;
@ -1016,13 +1044,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
WriteBatchInternal::SetSequence(merged_batch, sequence);
uint64_t log_size;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
if (status.ok() && need_log_sync) {
if (io_s.ok() && need_log_sync) {
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
@ -1032,23 +1060,24 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
// - as long as other threads don't modify it, it's safe to read
// from std::deque from multiple threads concurrently.
for (auto& log : logs_) {
status = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!status.ok()) {
io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync);
if (!io_s.ok()) {
break;
}
}
if (status.ok() && need_log_dir_sync) {
if (io_s.ok() && need_log_dir_sync) {
// We only sync WAL directory the first time WAL syncing is
// requested, so that in case users never turn on WAL sync,
// we can avoid the disk I/O in the write code path.
status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr);
}
}
if (merged_batch == &tmp_batch_) {
tmp_batch_.Clear();
}
if (status.ok()) {
if (io_s.ok()) {
auto stats = default_cf_internal_stats_;
if (need_log_sync) {
stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1);
@ -1059,14 +1088,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}
return status;
return io_s;
}
Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used,
SequenceNumber* last_sequence,
size_t seq_inc) {
Status status;
IOStatus DBImpl::ConcurrentWriteToWAL(
const WriteThread::WriteGroup& write_group, uint64_t* log_used,
SequenceNumber* last_sequence, size_t seq_inc) {
IOStatus io_s;
assert(!write_group.leader->disable_wal);
// Same holds for all in the batch group
@ -1092,14 +1120,14 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer = logs_.back().writer;
uint64_t log_size;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (to_be_cached_state) {
cached_recoverable_state_ = *to_be_cached_state;
cached_recoverable_state_empty_ = false;
}
log_write_mutex_.Unlock();
if (status.ok()) {
if (io_s.ok()) {
const bool concurrent = true;
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size,
@ -1109,7 +1137,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
concurrent);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
}
return status;
return io_s;
}
Status DBImpl::WriteRecoverableState() {

@ -238,6 +238,46 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas
return bg_error_;
}
Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
BackgroundErrorReason reason) {
db_mutex_->AssertHeld();
if (bg_io_err.ok()) {
return Status::OK();
}
if (recovery_in_prog_ && recovery_error_.ok()) {
recovery_error_ = bg_io_err;
}
Status new_bg_io_err = bg_io_err;
Status s;
if (bg_io_err.GetDataLoss()) {
// FIrst, data loss is treated as unrecoverable error. So it can directly
// overwrite any existing bg_error_.
bool auto_recovery = false;
Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError);
bg_error_ = bg_err;
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s,
db_mutex_, &auto_recovery);
return bg_error_;
} else if (bg_io_err.GetRetryable()) {
// Second, check if the error is a retryable IO error or not. if it is
// retryable error and its severity is higher than bg_error_, overwrite
// the bg_error_ with new error.
// In current stage, treat retryable error as HardError. No automatic
// recovery.
bool auto_recovery = false;
Status bg_err(new_bg_io_err, Status::Severity::kHardError);
EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s,
db_mutex_, &auto_recovery);
if (bg_err.severity() > bg_error_.severity()) {
bg_error_ = bg_err;
}
return bg_error_;
} else {
s = SetBGError(new_bg_io_err, reason);
}
return s;
}
Status ErrorHandler::OverrideNoSpaceError(Status bg_error,
bool* auto_recovery) {
#ifndef ROCKSDB_LITE

@ -6,6 +6,7 @@
#include "monitoring/instrumented_mutex.h"
#include "options/db_options.h"
#include "rocksdb/io_status.h"
#include "rocksdb/listener.h"
#include "rocksdb/status.h"
@ -34,6 +35,8 @@ class ErrorHandler {
Status SetBGError(const Status& bg_err, BackgroundErrorReason reason);
Status SetBGError(const IOStatus& bg_io_err, BackgroundErrorReason reason);
Status GetBGError() { return bg_error_; }
Status GetRecoveryError() { return recovery_error_; }

@ -181,6 +181,69 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) {
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.listeners.emplace_back(listener);
Status s;
listener->EnableAutoRecovery(false);
DestroyAndReopen(options);
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
Put(Key(1), "val1");
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeFinishBuildTable",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Reopen(options);
ASSERT_EQ("val1", Get(Key(1)));
Put(Key(2), "val2");
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeSyncTable",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Reopen(options);
ASSERT_EQ("val2", Get(Key(2)));
Put(Key(3), "val3");
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:BeforeCloseTableFile",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Reopen(options);
ASSERT_EQ("val3", Get(Key(3)));
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
@ -224,6 +287,51 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteError) {
Close();
}
TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.listeners.emplace_back(listener);
Status s;
std::string old_manifest;
std::string new_manifest;
listener->EnableAutoRecovery(false);
DestroyAndReopen(options);
old_manifest = GetManifestNameFromLiveFiles();
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
Put(Key(0), "val");
Flush();
Put(Key(1), "val");
SyncPoint::GetInstance()->SetCallBack(
"VersionSet::LogAndApply:WriteManifest",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
SyncPoint::GetInstance()->EnableProcessing();
s = Flush();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
fault_fs->SetFilesystemActive(true);
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
new_manifest = GetManifestNameFromLiveFiles();
ASSERT_NE(new_manifest, old_manifest);
Reopen(options);
ASSERT_EQ("val", Get(Key(0)));
ASSERT_EQ("val", Get(Key(1)));
Close();
}
TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
@ -347,6 +455,61 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) {
Close();
}
TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(listener);
Status s;
std::string old_manifest;
std::string new_manifest;
DestroyAndReopen(options);
old_manifest = GetManifestNameFromLiveFiles();
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
Put(Key(0), "val");
Put(Key(2), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
listener->EnableAutoRecovery(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put(Key(1), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
fault_fs->SetFilesystemActive(true);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
new_manifest = GetManifestNameFromLiveFiles();
ASSERT_NE(new_manifest, old_manifest);
Reopen(options);
ASSERT_EQ("val", Get(Key(0)));
ASSERT_EQ("val", Get(Key(1)));
ASSERT_EQ("val", Get(Key(2)));
Close();
}
TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
@ -391,6 +554,53 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) {
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.level0_file_num_compaction_trigger = 2;
options.listeners.emplace_back(listener);
Status s;
DestroyAndReopen(options);
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
Put(Key(0), "va;");
Put(Key(2), "va;");
s = Flush();
ASSERT_EQ(s, Status::OK());
listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError));
listener->EnableAutoRecovery(false);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:FlushMemTableFinished",
"BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"BackgroundCallCompaction:0",
[&](void*) { fault_fs->SetFilesystemActive(false, error_msg); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put(Key(1), "val");
s = Flush();
ASSERT_EQ(s, Status::OK());
s = dbfull()->TEST_WaitForCompact();
ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError);
fault_fs->SetFilesystemActive(true);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
Destroy(options);
}
TEST_F(DBErrorHandlingFSTest, CorruptionError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
@ -566,6 +776,101 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) {
Close();
}
TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));
std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
std::shared_ptr<ErrorHandlerFSListener> listener(
new ErrorHandlerFSListener());
Options options = GetDefaultOptions();
options.env = fault_fs_env.get();
options.create_if_missing = true;
options.writable_file_max_buffer_size = 32768;
options.listeners.emplace_back(listener);
options.paranoid_checks = true;
Status s;
Random rnd(301);
DestroyAndReopen(options);
IOStatus error_msg = IOStatus::IOError("Retryable IO Error");
error_msg.SetRetryable(true);
// For the first batch, write is successful, require sync
{
WriteBatch batch;
for (auto i = 0; i < 100; ++i) {
batch.Put(Key(i), RandomString(&rnd, 1024));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
};
// For the second batch, the first 2 file Append are successful, then the
// following Append fails due to file system retryable IOError.
{
WriteBatch batch;
int write_error = 0;
for (auto i = 100; i < 200; ++i) {
batch.Put(Key(i), RandomString(&rnd, 1024));
}
SyncPoint::GetInstance()->SetCallBack(
"WritableFileWriter::Append:BeforePrepareWrite", [&](void*) {
write_error++;
if (write_error > 2) {
fault_fs->SetFilesystemActive(false, error_msg);
}
});
SyncPoint::GetInstance()->EnableProcessing();
WriteOptions wopts;
wopts.sync = true;
s = dbfull()->Write(wopts, &batch);
ASSERT_EQ(true, s.IsIOError());
}
fault_fs->SetFilesystemActive(true);
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
// Data in corrupted WAL are not stored
for (auto i = 0; i < 199; ++i) {
if (i < 100) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
} else {
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
}
}
// Resume and write a new batch, should be in the WAL
s = dbfull()->Resume();
ASSERT_EQ(s, Status::OK());
{
WriteBatch batch;
for (auto i = 200; i < 300; ++i) {
batch.Put(Key(i), RandomString(&rnd, 1024));
}
WriteOptions wopts;
wopts.sync = true;
ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK());
};
Reopen(options);
for (auto i = 0; i < 300; ++i) {
if (i < 100 || i >= 200) {
ASSERT_NE(Get(Key(i)), "NOT_FOUND");
} else {
ASSERT_EQ(Get(Key(i)), "NOT_FOUND");
}
}
Close();
}
TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) {
std::shared_ptr<FaultInjectionTestFS> fault_fs(
new FaultInjectionTestFS(FileSystem::Default()));

@ -238,10 +238,14 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
} else if (write_manifest_) {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table
IOStatus tmp_io_s;
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_, &committed_flush_jobs_info_);
log_buffer_, &committed_flush_jobs_info_, &tmp_io_s);
if (!tmp_io_s.ok()) {
io_status_ = tmp_io_s;
}
}
if (s.ok() && file_meta != nullptr) {
@ -371,6 +375,7 @@ Status FlushJob::WriteLevel0Table() {
meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
meta_.file_creation_time = current_time;
IOStatus io_s;
s = BuildTable(
dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
@ -381,10 +386,13 @@ Status FlushJob::WriteLevel0Table() {
output_compression_, mutable_cf_options_.sample_for_compression,
cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */,
TableFileCreationReason::kFlush, &io_s, event_logger_,
job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */,
meta_.oldest_ancester_time, oldest_key_time, write_hint,
current_time);
if (!io_s.ok()) {
io_status_ = io_s;
}
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,

@ -90,6 +90,9 @@ class FlushJob {
}
#endif // !ROCKSDB_LITE
// Return the IO status
IOStatus io_status() const { return io_status_; }
private:
void ReportStartedFlush();
void ReportFlushInputSize(const autovector<MemTable*>& mems);
@ -154,6 +157,7 @@ class FlushJob {
Version* base_;
bool pick_memtable_called;
Env::Priority thread_pri_;
IOStatus io_status_;
};
} // namespace ROCKSDB_NAMESPACE

@ -108,7 +108,7 @@ class FlushJobTest : public testing::Test {
}
ASSERT_OK(s);
// Make "CURRENT" file that points to the new manifest file.
s = SetCurrentFile(env_, dbname_, 1, nullptr);
s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
}
Env* env_;

@ -37,10 +37,10 @@ Writer::~Writer() {
}
}
Status Writer::WriteBuffer() { return dest_->Flush(); }
IOStatus Writer::WriteBuffer() { return dest_->Flush(); }
Status Writer::Close() {
Status s;
IOStatus Writer::Close() {
IOStatus s;
if (dest_) {
s = dest_->Close();
dest_.reset();
@ -48,7 +48,7 @@ Status Writer::Close() {
return s;
}
Status Writer::AddRecord(const Slice& slice) {
IOStatus Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
@ -59,7 +59,7 @@ Status Writer::AddRecord(const Slice& slice) {
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
IOStatus s;
bool begin = true;
do {
const int64_t leftover = kBlockSize - block_offset_;
@ -114,7 +114,7 @@ Status Writer::AddRecord(const Slice& slice) {
bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes
size_t header_size;
@ -150,7 +150,7 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
EncodeFixed32(buf, crc);
// Write the header and the payload
Status s = dest_->Append(Slice(buf, header_size));
IOStatus s = dest_->Append(Slice(buf, header_size));
if (s.ok()) {
s = dest_->Append(Slice(ptr, n));
}

@ -13,6 +13,7 @@
#include <memory>
#include "db/log_format.h"
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
@ -79,16 +80,16 @@ class Writer {
~Writer();
Status AddRecord(const Slice& slice);
IOStatus AddRecord(const Slice& slice);
WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); }
uint64_t get_log_number() const { return log_number_; }
Status WriteBuffer();
IOStatus WriteBuffer();
Status Close();
IOStatus Close();
bool TEST_BufferIsEmpty();
@ -103,7 +104,7 @@ class Writer {
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
IOStatus EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::WriteBuffer()

@ -390,7 +390,8 @@ Status MemTableList::TryInstallMemtableFlushResults(
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info) {
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
IOStatus* io_s) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld();
@ -471,8 +472,10 @@ Status MemTableList::TryInstallMemtableFlushResults(
}
// this can release and reacquire the mutex.
vset->SetIOStatusOK();
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory);
*io_s = vset->io_status();
// we will be changing the version in the next code path,
// so we better create a new one, since versions are immutable

@ -266,7 +266,8 @@ class MemTableList {
VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
autovector<MemTable*>* to_delete, FSDirectory* db_directory,
LogBuffer* log_buffer,
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info);
std::list<std::unique_ptr<FlushJobInfo>>* committed_flush_jobs_info,
IOStatus* io_s);
// New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add().

@ -114,13 +114,14 @@ class MemTableListTest : public testing::Test {
auto cfd = column_family_set->GetDefault();
EXPECT_TRUE(nullptr != cfd);
uint64_t file_num = file_number.fetch_add(1);
IOStatus io_s;
// Create dummy mutex.
InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
Status s = list->TryInstallMemtableFlushResults(
cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
return s;
}

@ -424,6 +424,7 @@ class Repairer {
}
LegacyFileSystemWrapper fs(env_);
IOStatus io_s;
status = BuildTable(
dbname_, env_, &fs, *cfd->ioptions(),
*cfd->GetLatestMutableCFOptions(), env_options_, table_cache_,
@ -432,7 +433,7 @@ class Repairer {
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber,
snapshot_checker, kNoCompression, 0 /* sample_for_compression */,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery, nullptr /* event_logger */,
TableFileCreationReason::kRecovery, &io_s, nullptr /* event_logger */,
0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
-1 /* level */, current_time, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,

@ -3796,6 +3796,7 @@ Status VersionSet::ProcessManifestWrites(
uint64_t new_manifest_file_size = 0;
Status s;
IOStatus io_s;
assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
@ -3906,15 +3907,19 @@ Status VersionSet::ProcessManifestWrites(
}
++idx;
#endif /* !NDEBUG */
s = descriptor_log_->AddRecord(record);
if (!s.ok()) {
io_s = descriptor_log_->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
break;
}
}
if (s.ok()) {
s = SyncManifest(env_, db_options_, descriptor_log_->file());
io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
}
if (!s.ok()) {
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
s.ToString().c_str());
}
@ -3923,8 +3928,12 @@ Status VersionSet::ProcessManifestWrites(
// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && new_descriptor_log) {
s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_,
db_directory);
io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_,
db_directory);
if (!io_s.ok()) {
io_status_ = io_s;
s = io_s;
}
TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
}
@ -5194,9 +5203,10 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption("Unable to Encode VersionEdit:" +
edit_for_db_id.DebugString(true));
}
Status add_record = log->AddRecord(db_id_record);
if (!add_record.ok()) {
return add_record;
IOStatus io_s = log->AddRecord(db_id_record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
}
}
@ -5221,9 +5231,10 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
Status s = log->AddRecord(record);
if (!s.ok()) {
return s;
IOStatus io_s = log->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
}
}
@ -5252,9 +5263,10 @@ Status VersionSet::WriteCurrentStateToManifest(
return Status::Corruption(
"Unable to Encode VersionEdit:" + edit.DebugString(true));
}
Status s = log->AddRecord(record);
if (!s.ok()) {
return s;
IOStatus io_s = log->AddRecord(record);
if (!io_s.ok()) {
io_status_ = io_s;
return std::move(io_s);
}
}
}

@ -1075,6 +1075,12 @@ class VersionSet {
static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
// Get the IO Status returned by written Manifest.
IOStatus io_status() const { return io_status_; }
// Set the IO Status to OK. Called before Manifest write if needed.
void SetIOStatusOK() { io_status_ = IOStatus::OK(); }
protected:
struct ManifestWriter;
@ -1193,6 +1199,9 @@ class VersionSet {
BlockCacheTracer* const block_cache_tracer_;
// Store the IO status when Manifest is written
IOStatus io_status_;
private:
// REQUIRES db mutex at beginning. may release and re-acquire db mutex
Status ProcessManifestWrites(std::deque<ManifestWriter>& writers,

@ -744,7 +744,7 @@ class VersionSetTestBase {
PrepareManifest(&column_families, &last_seqno, &log_writer);
log_writer.reset();
// Make "CURRENT" file point to the new manifest file.
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
ASSERT_OK(s);
EXPECT_OK(versions_->Recover(column_families, false));
@ -847,7 +847,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
edits_[i].MarkAtomicGroup(--remaining);
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
}
void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) {
@ -859,7 +859,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
edits_[i].MarkAtomicGroup(--remaining);
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
}
void SetupCorruptedAtomicGroup(int atomic_group_size) {
@ -873,7 +873,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
}
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
}
void SetupIncorrectAtomicGroup(int atomic_group_size) {
@ -889,7 +889,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase,
}
edits_[i].SetLastSequence(last_seqno_++);
}
ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr));
ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr));
}
void SetupTestSyncPoints() {
@ -1241,7 +1241,7 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) {
SequenceNumber last_seqno;
std::unique_ptr<log::Writer> log_writer;
PrepareManifest(&column_families, &last_seqno, &log_writer);
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
ASSERT_OK(s);
EXPECT_OK(versions_->Recover(column_families, false /* read_only */));
@ -1378,7 +1378,7 @@ class EmptyDefaultCfNewManifest : public VersionSetTestBase,
TEST_F(EmptyDefaultCfNewManifest, Recover) {
PrepareManifest(nullptr, nullptr, &log_writer_);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
@ -1440,7 +1440,7 @@ TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) {
db_options_.write_dbid_to_manifest = std::get<0>(GetParam());
PrepareManifest(nullptr, nullptr, &log_writer_);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
@ -1483,7 +1483,7 @@ TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) {
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
@ -1529,7 +1529,7 @@ TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) {
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
@ -1586,7 +1586,7 @@ TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) {
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
@ -1642,7 +1642,7 @@ TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) {
ASSERT_OK(s);
}
log_writer_.reset();
s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr);
s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr);
ASSERT_OK(s);
std::string manifest_path;
@ -1901,7 +1901,7 @@ TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) {
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
@ -1952,7 +1952,7 @@ TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) {
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, added_files, std::vector<std::pair<int, uint64_t>>());
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);
@ -2001,7 +2001,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) {
WriteFileAdditionAndDeletionToManifest(
/*cf=*/0, std::vector<std::pair<int, FileMetaData>>(), deleted_files);
log_writer_.reset();
Status s = SetCurrentFile(env_, dbname_, 1, nullptr);
Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
ASSERT_OK(s);
std::string manifest_path;
VerifyManifest(&manifest_path);

@ -20,22 +20,6 @@ namespace ROCKSDB_NAMESPACE {
// Options::env only, whereas in the latter case, the user will specify
// Options::env and Options::file_system.
inline IOStatus status_to_io_status(Status&& status) {
if (status.ok()) {
// Fast path
return IOStatus::OK();
} else {
const char* state = status.getState();
if (state) {
return IOStatus(status.code(), status.subcode(),
Slice(state, strlen(status.getState()) + 1),
Slice());
} else {
return IOStatus(status.code(), status.subcode());
}
}
}
class CompositeSequentialFileWrapper : public SequentialFile {
public:
explicit CompositeSequentialFileWrapper(

16
env/env.cc vendored

@ -369,20 +369,8 @@ void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
bool should_sync) {
std::unique_ptr<WritableFile> file;
EnvOptions soptions;
Status s = env->NewWritableFile(fname, &file, soptions);
if (!s.ok()) {
return s;
}
s = file->Append(data);
if (s.ok() && should_sync) {
s = file->Sync();
}
if (!s.ok()) {
env->DeleteFile(fname);
}
return s;
LegacyFileSystemWrapper lfsw(env);
return WriteStringToFile(&lfsw, data, fname, should_sync);
}
Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {

25
env/file_system.cc vendored

@ -83,12 +83,31 @@ FileOptions FileSystem::OptimizeForCompactionTableRead(
return optimized_file_options;
}
Status ReadFileToString(FileSystem* fs, const std::string& fname,
std::string* data) {
IOStatus WriteStringToFile(FileSystem* fs, const Slice& data,
const std::string& fname, bool should_sync) {
std::unique_ptr<FSWritableFile> file;
EnvOptions soptions;
IOStatus s = fs->NewWritableFile(fname, soptions, &file, nullptr);
if (!s.ok()) {
return s;
}
s = file->Append(data, IOOptions(), nullptr);
if (s.ok() && should_sync) {
s = file->Sync(IOOptions(), nullptr);
}
if (!s.ok()) {
fs->DeleteFile(fname, IOOptions(), nullptr);
}
return s;
}
IOStatus ReadFileToString(FileSystem* fs, const std::string& fname,
std::string* data) {
FileOptions soptions;
data->clear();
std::unique_ptr<FSSequentialFile> file;
Status s = fs->NewSequentialFile(fname, soptions, &file, nullptr);
IOStatus s = status_to_io_status(
fs->NewSequentialFile(fname, soptions, &file, nullptr));
if (!s.ok()) {
return s;
}

@ -368,7 +368,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number,
return true;
}
Status SetCurrentFile(Env* env, const std::string& dbname,
IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
uint64_t descriptor_number,
FSDirectory* directory_to_fsync) {
// Remove leading "dbname/" and add newline to manifest file name
@ -377,10 +377,10 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
assert(contents.starts_with(dbname + "/"));
contents.remove_prefix(dbname.size() + 1);
std::string tmp = TempFileName(dbname, descriptor_number);
Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp, true);
IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true);
if (s.ok()) {
TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2);
s = env->RenameFile(tmp, CurrentFileName(dbname));
s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr);
TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2);
}
if (s.ok()) {
@ -388,7 +388,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname,
s = directory_to_fsync->Fsync(IOOptions(), nullptr);
}
} else {
env->DeleteFile(tmp);
fs->DeleteFile(tmp, IOOptions(), nullptr);
}
return s;
}
@ -414,8 +414,8 @@ Status SetIdentityFile(Env* env, const std::string& dbname,
return s;
}
Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
IOStatus SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
TEST_KILL_RANDOM("SyncManifest:0", rocksdb_kill_odds * REDUCE_ODDS2);
StopWatch sw(env, db_options->statistics.get(), MANIFEST_FILE_SYNC_MICROS);
return file->Sync(db_options->use_fsync);

@ -169,7 +169,7 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number,
// Make the CURRENT file point to the descriptor file with the
// specified number.
extern Status SetCurrentFile(Env* env, const std::string& dbname,
extern IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname,
uint64_t descriptor_number,
FSDirectory* directory_to_fsync);
@ -178,8 +178,8 @@ extern Status SetIdentityFile(Env* env, const std::string& dbname,
const std::string& db_id = {});
// Sync manifest file `file`.
extern Status SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file);
extern IOStatus SyncManifest(Env* env, const ImmutableDBOptions* db_options,
WritableFileWriter* file);
// Return list of file names of info logs in `file_names`.
// The list only contains file name. The parent directory name is stored

@ -21,10 +21,10 @@
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
Status WritableFileWriter::Append(const Slice& data) {
IOStatus WritableFileWriter::Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
Status s;
IOStatus s;
pending_sync_ = true;
TEST_KILL_RANDOM("WritableFileWriter::Append:0",
@ -94,7 +94,7 @@ Status WritableFileWriter::Append(const Slice& data) {
return s;
}
Status WritableFileWriter::Pad(const size_t pad_bytes) {
IOStatus WritableFileWriter::Pad(const size_t pad_bytes) {
assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize();
@ -107,7 +107,7 @@ Status WritableFileWriter::Pad(const size_t pad_bytes) {
buf_.PadWith(append_bytes, 0);
left -= append_bytes;
if (left > 0) {
Status s = Flush();
IOStatus s = Flush();
if (!s.ok()) {
return s;
}
@ -116,12 +116,12 @@ Status WritableFileWriter::Pad(const size_t pad_bytes) {
}
pending_sync_ = true;
filesize_ += pad_bytes;
return Status::OK();
return IOStatus::OK();
}
Status WritableFileWriter::Close() {
IOStatus WritableFileWriter::Close() {
// Do not quit immediately on failure the file MUST be closed
Status s;
IOStatus s;
// Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough
@ -133,7 +133,7 @@ Status WritableFileWriter::Close() {
s = Flush(); // flush cache to OS
Status interim;
IOStatus interim;
// In direct I/O mode we write whole pages so
// we need to let the file know where data ends.
if (use_direct_io()) {
@ -160,8 +160,8 @@ Status WritableFileWriter::Close() {
// write out the cached data to the OS cache or storage if direct I/O
// enabled
Status WritableFileWriter::Flush() {
Status s;
IOStatus WritableFileWriter::Flush() {
IOStatus s;
TEST_KILL_RANDOM("WritableFileWriter::Flush:0",
rocksdb_kill_odds * REDUCE_ODDS2);
@ -224,8 +224,8 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const {
}
}
Status WritableFileWriter::Sync(bool use_fsync) {
Status s = Flush();
IOStatus WritableFileWriter::Sync(bool use_fsync) {
IOStatus s = Flush();
if (!s.ok()) {
return s;
}
@ -238,23 +238,23 @@ Status WritableFileWriter::Sync(bool use_fsync) {
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds);
pending_sync_ = false;
return Status::OK();
return IOStatus::OK();
}
Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (!writable_file_->IsSyncThreadSafe()) {
return Status::NotSupported(
return IOStatus::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
"WritableFile::IsSyncThreadSafe() is false");
}
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
Status s = SyncInternal(use_fsync);
IOStatus s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
return s;
}
Status WritableFileWriter::SyncInternal(bool use_fsync) {
Status s;
IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
IOStatus s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
auto prev_perf_level = GetPerfLevel();
@ -268,7 +268,7 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) {
return s;
}
Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr);
@ -276,8 +276,8 @@ Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
// This method writes to disk the specified data and makes use of the rate
// limiter if available
Status WritableFileWriter::WriteBuffered(const char* data, size_t size) {
Status s;
IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) {
IOStatus s;
assert(!use_direct_io());
const char* src = data;
size_t left = size;
@ -352,9 +352,9 @@ void WritableFileWriter::CalculateFileChecksum(const Slice& data) {
// only write on aligned
// offsets.
#ifndef ROCKSDB_LITE
Status WritableFileWriter::WriteDirect() {
IOStatus WritableFileWriter::WriteDirect() {
assert(use_direct_io());
Status s;
IOStatus s;
const size_t alignment = buf_.Alignment();
assert((next_write_offset_ % alignment) == 0);

@ -15,6 +15,7 @@
#include "rocksdb/env.h"
#include "rocksdb/file_checksum.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
#include "rocksdb/listener.h"
#include "rocksdb/rate_limiter.h"
#include "test_util/sync_point.h"
@ -36,11 +37,11 @@ class WritableFileWriter {
void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
const FileOperationInfo::TimePoint& start_ts,
const FileOperationInfo::TimePoint& finish_ts,
const Status& status) {
const IOStatus& io_status) {
FileOperationInfo info(file_name_, start_ts, finish_ts);
info.offset = offset;
info.length = length;
info.status = status;
info.status = io_status;
for (auto& listener : listeners_) {
listener->OnFileWriteFinish(info);
@ -122,24 +123,24 @@ class WritableFileWriter {
std::string file_name() const { return file_name_; }
Status Append(const Slice& data);
IOStatus Append(const Slice& data);
Status Pad(const size_t pad_bytes);
IOStatus Pad(const size_t pad_bytes);
Status Flush();
IOStatus Flush();
Status Close();
IOStatus Close();
Status Sync(bool use_fsync);
IOStatus Sync(bool use_fsync);
// Sync only the data that was already Flush()ed. Safe to call concurrently
// with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
// returns NotSupported status.
Status SyncWithoutFlush(bool use_fsync);
IOStatus SyncWithoutFlush(bool use_fsync);
uint64_t GetFileSize() const { return filesize_; }
Status InvalidateCache(size_t offset, size_t length) {
IOStatus InvalidateCache(size_t offset, size_t length) {
return writable_file_->InvalidateCache(offset, length);
}
@ -161,11 +162,11 @@ class WritableFileWriter {
// Used when os buffering is OFF and we are writing
// DMA such as in Direct I/O mode
#ifndef ROCKSDB_LITE
Status WriteDirect();
IOStatus WriteDirect();
#endif // !ROCKSDB_LITE
// Normal write
Status WriteBuffered(const char* data, size_t size);
Status RangeSync(uint64_t offset, uint64_t nbytes);
Status SyncInternal(bool use_fsync);
IOStatus WriteBuffered(const char* data, size_t size);
IOStatus RangeSync(uint64_t offset, uint64_t nbytes);
IOStatus SyncInternal(bool use_fsync);
};
} // namespace ROCKSDB_NAMESPACE

@ -1393,8 +1393,13 @@ class FSDirectoryWrapper : public FSDirectory {
FSDirectory* target_;
};
// A utility routine: write "data" to the named file.
extern IOStatus WriteStringToFile(FileSystem* fs, const Slice& data,
const std::string& fname,
bool should_sync = false);
// A utility routine: read contents of named file into *data
extern Status ReadFileToString(FileSystem* fs, const std::string& fname,
std::string* data);
extern IOStatus ReadFileToString(FileSystem* fs, const std::string& fname,
std::string* data);
} // namespace ROCKSDB_NAMESPACE

@ -227,4 +227,19 @@ inline bool IOStatus::operator!=(const IOStatus& rhs) const {
return !(*this == rhs);
}
inline IOStatus status_to_io_status(Status&& status) {
if (status.ok()) {
// Fast path
return IOStatus::OK();
} else {
const char* state = status.getState();
if (state) {
return IOStatus(status.code(), status.subcode(),
Slice(state, strlen(status.getState()) + 1), Slice());
} else {
return IOStatus(status.code(), status.subcode());
}
}
}
} // namespace ROCKSDB_NAMESPACE

@ -284,6 +284,7 @@ struct BlockBasedTableBuilder::Rep {
WritableFileWriter* file;
uint64_t offset = 0;
Status status;
IOStatus io_status;
size_t alignment;
BlockBuilder data_block;
// Buffers uncompressed data blocks and keys to replay later. Needed when
@ -725,8 +726,9 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
assert(r->status.ok());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
assert(r->io_status.ok());
r->io_status = r->file->Append(block_contents);
if (r->io_status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
char* trailer_without_type = trailer + 1;
@ -765,32 +767,35 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
}
}
assert(r->status.ok());
assert(r->io_status.ok());
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum",
static_cast<char*>(trailer));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->io_status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->io_status.ok()) {
r->status = InsertBlockInCache(block_contents, type, handle);
}
if (r->status.ok()) {
if (r->status.ok() && r->io_status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
if (r->table_options.block_align && is_data_block) {
size_t pad_bytes =
(r->alignment - ((block_contents.size() + kBlockTrailerSize) &
(r->alignment - 1))) &
(r->alignment - 1);
r->status = r->file->Pad(pad_bytes);
if (r->status.ok()) {
r->io_status = r->file->Pad(pad_bytes);
if (r->io_status.ok()) {
r->offset += pad_bytes;
}
}
}
}
r->status = r->io_status;
}
Status BlockBasedTableBuilder::status() const { return rep_->status; }
IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; }
//
// Make a copy of the block contents and insert into compressed block cache
//
@ -1050,10 +1055,12 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle,
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
assert(r->status.ok());
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
assert(r->io_status.ok());
r->io_status = r->file->Append(footer_encoding);
if (r->io_status.ok()) {
r->offset += footer_encoding.size();
}
r->status = r->io_status;
}
void BlockBasedTableBuilder::EnterUnbuffered() {

@ -68,6 +68,9 @@ class BlockBasedTableBuilder : public TableBuilder {
// Return non-ok iff some error has been detected.
Status status() const override;
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override;
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called

@ -252,9 +252,9 @@ Status CuckooTableBuilder::Finish() {
hash_table_size_ =
static_cast<uint64_t>(num_entries_ / max_hash_table_ratio_);
}
s = MakeHashTable(&buckets);
if (!s.ok()) {
return s;
status_ = MakeHashTable(&buckets);
if (!status_.ok()) {
return status_;
}
// Determine unused_user_key to fill empty buckets.
std::string unused_user_key = smallest_user_key_;
@ -301,18 +301,19 @@ Status CuckooTableBuilder::Finish() {
uint32_t num_added = 0;
for (auto& bucket : buckets) {
if (bucket.vector_idx == kMaxVectorIdx) {
s = file_->Append(Slice(unused_bucket));
io_status_ = file_->Append(Slice(unused_bucket));
} else {
++num_added;
s = file_->Append(GetKey(bucket.vector_idx));
if (s.ok()) {
io_status_ = file_->Append(GetKey(bucket.vector_idx));
if (io_status_.ok()) {
if (value_size_ > 0) {
s = file_->Append(GetValue(bucket.vector_idx));
io_status_ = file_->Append(GetValue(bucket.vector_idx));
}
}
}
if (!s.ok()) {
return s;
if (!io_status_.ok()) {
status_ = io_status_;
return status_;
}
}
assert(num_added == NumEntries());
@ -364,10 +365,11 @@ Status CuckooTableBuilder::Finish() {
BlockHandle property_block_handle;
property_block_handle.set_offset(offset);
property_block_handle.set_size(property_block.size());
s = file_->Append(property_block);
io_status_ = file_->Append(property_block);
offset += property_block.size();
if (!s.ok()) {
return s;
if (!io_status_.ok()) {
status_ = io_status_;
return status_;
}
meta_index_builder.Add(kPropertiesBlock, property_block_handle);
@ -376,9 +378,10 @@ Status CuckooTableBuilder::Finish() {
BlockHandle meta_index_block_handle;
meta_index_block_handle.set_offset(offset);
meta_index_block_handle.set_size(meta_index_block.size());
s = file_->Append(meta_index_block);
if (!s.ok()) {
return s;
io_status_ = file_->Append(meta_index_block);
if (!io_status_.ok()) {
status_ = io_status_;
return status_;
}
Footer footer(kCuckooTableMagicNumber, 1);
@ -386,12 +389,13 @@ Status CuckooTableBuilder::Finish() {
footer.set_index_handle(BlockHandle::NullBlockHandle());
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
s = file_->Append(footer_encoding);
io_status_ = file_->Append(footer_encoding);
if (file_ != nullptr) {
file_checksum_ = file_->GetFileChecksum();
}
return s;
status_ = io_status_;
return status_;
}
void CuckooTableBuilder::Abandon() {

@ -46,6 +46,9 @@ class CuckooTableBuilder: public TableBuilder {
// Return non-ok iff some error has been detected.
Status status() const override { return status_; }
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override { return io_status_; }
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
@ -116,6 +119,7 @@ class CuckooTableBuilder: public TableBuilder {
// Number of keys that contain value (non-deletion op)
uint64_t num_values_;
Status status_;
IOStatus io_status_;
TableProperties properties_;
const Comparator* ucomp_;
bool use_module_hash_;

@ -15,6 +15,7 @@
#include "db/version_edit.h"
#include "port/port.h"
#include "rocksdb/comparator.h"
#include "rocksdb/io_status.h"
#include "rocksdb/table.h"
#include "table/internal_iterator.h"
#include "table/table_builder.h"
@ -138,6 +139,9 @@ class MockTableBuilder : public TableBuilder {
// Return non-ok iff some error has been detected.
Status status() const override { return Status::OK(); }
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override { return IOStatus::OK(); }
Status Finish() override {
MutexLock lock_guard(&file_system_->mutex);
file_system_->files.insert({id_, table_});

@ -36,16 +36,16 @@ namespace {
// a utility that helps writing block content to the file
// @offset will advance if @block_contents was successfully written.
// @block_handle the block handle this particular block.
Status WriteBlock(const Slice& block_contents, WritableFileWriter* file,
uint64_t* offset, BlockHandle* block_handle) {
IOStatus WriteBlock(const Slice& block_contents, WritableFileWriter* file,
uint64_t* offset, BlockHandle* block_handle) {
block_handle->set_offset(*offset);
block_handle->set_size(block_contents.size());
Status s = file->Append(block_contents);
IOStatus io_s = file->Append(block_contents);
if (s.ok()) {
if (io_s.ok()) {
*offset += block_contents.size();
}
return s;
return io_s;
}
} // namespace
@ -145,41 +145,46 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
assert(offset_ <= std::numeric_limits<uint32_t>::max());
auto prev_offset = static_cast<uint32_t>(offset_);
// Write out the key
encoder_.AppendKey(key, file_, &offset_, meta_bytes_buf,
&meta_bytes_buf_size);
io_status_ = encoder_.AppendKey(key, file_, &offset_, meta_bytes_buf,
&meta_bytes_buf_size);
if (SaveIndexInFile()) {
index_builder_->AddKeyPrefix(GetPrefix(internal_key), prev_offset);
}
// Write value length
uint32_t value_size = static_cast<uint32_t>(value.size());
char* end_ptr =
EncodeVarint32(meta_bytes_buf + meta_bytes_buf_size, value_size);
assert(end_ptr <= meta_bytes_buf + sizeof(meta_bytes_buf));
meta_bytes_buf_size = end_ptr - meta_bytes_buf;
file_->Append(Slice(meta_bytes_buf, meta_bytes_buf_size));
if (io_status_.ok()) {
char* end_ptr =
EncodeVarint32(meta_bytes_buf + meta_bytes_buf_size, value_size);
assert(end_ptr <= meta_bytes_buf + sizeof(meta_bytes_buf));
meta_bytes_buf_size = end_ptr - meta_bytes_buf;
io_status_ = file_->Append(Slice(meta_bytes_buf, meta_bytes_buf_size));
}
// Write value
file_->Append(value);
offset_ += value_size + meta_bytes_buf_size;
properties_.num_entries++;
properties_.raw_key_size += key.size();
properties_.raw_value_size += value.size();
if (internal_key.type == kTypeDeletion ||
internal_key.type == kTypeSingleDeletion) {
properties_.num_deletions++;
} else if (internal_key.type == kTypeMerge) {
properties_.num_merge_operands++;
if (io_status_.ok()) {
io_status_ = file_->Append(value);
offset_ += value_size + meta_bytes_buf_size;
}
if (io_status_.ok()) {
properties_.num_entries++;
properties_.raw_key_size += key.size();
properties_.raw_value_size += value.size();
if (internal_key.type == kTypeDeletion ||
internal_key.type == kTypeSingleDeletion) {
properties_.num_deletions++;
} else if (internal_key.type == kTypeMerge) {
properties_.num_merge_operands++;
}
}
// notify property collectors
NotifyCollectTableCollectorsOnAdd(
key, value, offset_, table_properties_collectors_, ioptions_.info_log);
status_ = io_status_;
}
Status PlainTableBuilder::status() const { return status_; }
Status PlainTableBuilder::Finish() {
assert(!closed_);
closed_ = true;
@ -214,10 +219,12 @@ Status PlainTableBuilder::Finish() {
Slice bloom_finish_result = bloom_block_.Finish();
properties_.filter_size = bloom_finish_result.size();
s = WriteBlock(bloom_finish_result, file_, &offset_, &bloom_block_handle);
io_status_ =
WriteBlock(bloom_finish_result, file_, &offset_, &bloom_block_handle);
if (!s.ok()) {
return s;
if (!io_status_.ok()) {
status_ = io_status_;
return status_;
}
meta_index_builer.Add(BloomBlockBuilder::kBloomBlock, bloom_block_handle);
}
@ -225,10 +232,12 @@ Status PlainTableBuilder::Finish() {
Slice index_finish_result = index_builder_->Finish();
properties_.index_size = index_finish_result.size();
s = WriteBlock(index_finish_result, file_, &offset_, &index_block_handle);
io_status_ =
WriteBlock(index_finish_result, file_, &offset_, &index_block_handle);
if (!s.ok()) {
return s;
if (!io_status_.ok()) {
status_ = io_status_;
return status_;
}
meta_index_builer.Add(PlainTableIndexBuilder::kPlainTableIndexBlock,
@ -249,27 +258,24 @@ Status PlainTableBuilder::Finish() {
// -- Write property block
BlockHandle property_block_handle;
auto s = WriteBlock(
IOStatus s = WriteBlock(
property_block_builder.Finish(),
file_,
&offset_,
&property_block_handle
);
if (!s.ok()) {
return s;
return std::move(s);
}
meta_index_builer.Add(kPropertiesBlock, property_block_handle);
// -- write metaindex block
BlockHandle metaindex_block_handle;
s = WriteBlock(
meta_index_builer.Finish(),
file_,
&offset_,
&metaindex_block_handle
);
if (!s.ok()) {
return s;
io_status_ = WriteBlock(meta_index_builer.Finish(), file_, &offset_,
&metaindex_block_handle);
if (!io_status_.ok()) {
status_ = io_status_;
return status_;
}
// Write Footer
@ -279,15 +285,16 @@ Status PlainTableBuilder::Finish() {
footer.set_index_handle(BlockHandle::NullBlockHandle());
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
s = file_->Append(footer_encoding);
if (s.ok()) {
io_status_ = file_->Append(footer_encoding);
if (io_status_.ok()) {
offset_ += footer_encoding.size();
}
if (file_ != nullptr) {
file_checksum_ = file_->GetFileChecksum();
}
return s;
status_ = io_status_;
return status_;
}
void PlainTableBuilder::Abandon() {

@ -59,7 +59,10 @@ class PlainTableBuilder: public TableBuilder {
void Add(const Slice& key, const Slice& value) override;
// Return non-ok iff some error has been detected.
Status status() const override;
Status status() const override { return status_; }
// Return non-ok iff some error happens during IO.
IOStatus io_status() const override { return io_status_; }
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
@ -105,6 +108,7 @@ class PlainTableBuilder: public TableBuilder {
uint32_t bloom_bits_per_key_;
size_t huge_page_tlb_size_;
Status status_;
IOStatus io_status_;
TableProperties properties_;
PlainTableKeyEncoder encoder_;

@ -80,13 +80,13 @@ inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset,
}
}
Status PlainTableKeyEncoder::AppendKey(const Slice& key,
WritableFileWriter* file,
uint64_t* offset, char* meta_bytes_buf,
size_t* meta_bytes_buf_size) {
IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key,
WritableFileWriter* file,
uint64_t* offset, char* meta_bytes_buf,
size_t* meta_bytes_buf_size) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(key, &parsed_key)) {
return Status::Corruption(Slice());
return IOStatus::Corruption(Slice());
}
Slice key_to_write = key; // Portion of internal key to write out.
@ -99,9 +99,9 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key,
char* ptr = EncodeVarint32(key_size_buf, user_key_size);
assert(ptr <= key_size_buf + sizeof(key_size_buf));
auto len = ptr - key_size_buf;
Status s = file->Append(Slice(key_size_buf, len));
if (!s.ok()) {
return s;
IOStatus io_s = file->Append(Slice(key_size_buf, len));
if (!io_s.ok()) {
return io_s;
}
*offset += len;
}
@ -117,9 +117,9 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key,
key_count_for_prefix_ = 1;
pre_prefix_.SetUserKey(prefix);
size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes);
Status s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!s.ok()) {
return s;
IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!io_s.ok()) {
return io_s;
}
*offset += size_bytes_pos;
} else {
@ -135,9 +135,9 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key,
static_cast<uint32_t>(pre_prefix_.GetUserKey().size());
size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len,
size_bytes + size_bytes_pos);
Status s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!s.ok()) {
return s;
IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos));
if (!io_s.ok()) {
return io_s;
}
*offset += size_bytes_pos;
key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len);
@ -149,20 +149,23 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key,
// If the row is of value type with seqId 0, flush the special flag together
// in this buffer to safe one file append call, which takes 1 byte.
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
Status s =
IOStatus io_s =
file->Append(Slice(key_to_write.data(), key_to_write.size() - 8));
if (!s.ok()) {
return s;
if (!io_s.ok()) {
return io_s;
}
*offset += key_to_write.size() - 8;
meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0;
*meta_bytes_buf_size += 1;
} else {
file->Append(key_to_write);
IOStatus io_s = file->Append(key_to_write);
if (!io_s.ok()) {
return io_s;
}
*offset += key_to_write.size();
}
return Status::OK();
return IOStatus::OK();
}
Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset,

@ -44,8 +44,9 @@ class PlainTableKeyEncoder {
// meta_bytes_buf: buffer for extra meta bytes
// meta_bytes_buf_size: offset to append extra meta bytes. Will be updated
// if meta_bytes_buf is updated.
Status AppendKey(const Slice& key, WritableFileWriter* file, uint64_t* offset,
char* meta_bytes_buf, size_t* meta_bytes_buf_size);
IOStatus AppendKey(const Slice& key, WritableFileWriter* file,
uint64_t* offset, char* meta_bytes_buf,
size_t* meta_bytes_buf_size);
// Return actual encoding type to be picked
EncodingType GetEncodingType() { return encoding_type_; }

@ -136,6 +136,9 @@ class TableBuilder {
// Return non-ok iff some error has been detected.
virtual Status status() const = 0;
// Return non-ok iff some error happens during IO.
virtual IOStatus io_status() const = 0;
// Finish building the table.
// REQUIRES: Finish(), Abandon() have not been called
virtual Status Finish() = 0;

Loading…
Cancel
Save