diff --git a/db/builder.cc b/db/builder.cc index fdb814cbb..c78b3b618 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -81,10 +81,11 @@ Status BuildTable( SnapshotChecker* snapshot_checker, const CompressionType compression, uint64_t sample_for_compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, - TableFileCreationReason reason, EventLogger* event_logger, int job_id, - const Env::IOPriority io_priority, TableProperties* table_properties, - int level, const uint64_t creation_time, const uint64_t oldest_key_time, - Env::WriteLifeTimeHint write_hint, const uint64_t file_creation_time) { + TableFileCreationReason reason, IOStatus* io_status, + EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, + TableProperties* table_properties, int level, const uint64_t creation_time, + const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint, + const uint64_t file_creation_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -185,11 +186,13 @@ Status BuildTable( tp = builder->GetTableProperties(); bool empty = builder->NumEntries() == 0 && tp.num_range_deletions == 0; s = c_iter.status(); + TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); if (!s.ok() || empty) { builder->Abandon(); } else { s = builder->Finish(); } + *io_status = builder->io_status(); if (s.ok() && !empty) { uint64_t file_size = builder->FileSize(); @@ -209,11 +212,16 @@ Status BuildTable( // Finish and check for file errors if (s.ok() && !empty) { StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS); - s = file_writer->Sync(ioptions.use_fsync); + *io_status = file_writer->Sync(ioptions.use_fsync); } - if (s.ok() && !empty) { - s = file_writer->Close(); + if (io_status->ok() && !empty) { + *io_status = file_writer->Close(); } + if (!io_status->ok()) { + s = *io_status; + } + + // TODO Also check the IO status when create the Iterator. if (s.ok() && !empty) { // Verify that the table is usable diff --git a/db/builder.h b/db/builder.h index 062f1fb80..512310a3c 100644 --- a/db/builder.h +++ b/db/builder.h @@ -78,7 +78,7 @@ extern Status BuildTable( const uint64_t sample_for_compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, - EventLogger* event_logger = nullptr, int job_id = 0, + IOStatus* io_status, EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, int level = -1, const uint64_t creation_time = 0, const uint64_t oldest_key_time = 0, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 9f42f00ee..84c8d08bf 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -613,8 +613,13 @@ Status CompactionJob::Run() { } } + IOStatus io_s; if (status.ok() && output_directory_) { - status = output_directory_->Fsync(IOOptions(), nullptr); + io_s = output_directory_->Fsync(IOOptions(), nullptr); + } + if (!io_s.ok()) { + io_status_ = io_s; + status = io_s; } if (status.ok()) { @@ -713,9 +718,13 @@ Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { cfd->internal_stats()->AddCompactionStats( compact_->compaction->output_level(), thread_pri_, compaction_stats_); + versions_->SetIOStatusOK(); if (status.ok()) { status = InstallCompactionResults(mutable_cf_options); } + if (!versions_->io_status().ok()) { + io_status_ = versions_->io_status(); + } VersionStorageInfo::LevelSummaryStorage tmp; auto vstorage = cfd->current()->storage_info(); const auto& stats = compaction_stats_; @@ -1294,6 +1303,10 @@ Status CompactionJob::FinishCompactionOutputFile( } else { sub_compact->builder->Abandon(); } + if (!sub_compact->builder->io_status().ok()) { + io_status_ = sub_compact->builder->io_status(); + s = io_status_; + } const uint64_t current_bytes = sub_compact->builder->FileSize(); if (s.ok()) { // Add the checksum information to file metadata. @@ -1307,12 +1320,17 @@ Status CompactionJob::FinishCompactionOutputFile( sub_compact->total_bytes += current_bytes; // Finish and check for file errors + IOStatus io_s; if (s.ok()) { StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS); - s = sub_compact->outfile->Sync(db_options_.use_fsync); + io_s = sub_compact->outfile->Sync(db_options_.use_fsync); } - if (s.ok()) { - s = sub_compact->outfile->Close(); + if (io_s.ok()) { + io_s = sub_compact->outfile->Close(); + } + if (!io_s.ok()) { + io_status_ = io_s; + s = io_s; } sub_compact->outfile.reset(); diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index 55c1ca254..0c7b09a7d 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -100,6 +100,9 @@ class CompactionJob { // Add compaction input/output to the current version Status Install(const MutableCFOptions& mutable_cf_options); + // Return the IO status + IOStatus io_status() const { return io_status_; } + private: struct SubcompactionState; @@ -193,6 +196,7 @@ class CompactionJob { std::vector sizes_; Env::WriteLifeTimeHint write_hint_; Env::Priority thread_pri_; + IOStatus io_status_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index be68c9db8..20be87c94 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -281,7 +281,7 @@ class CompactionJobTest : public testing::Test { } ASSERT_OK(s); // Make "CURRENT" file that points to the new manifest file. - s = SetCurrentFile(env_, dbname_, 1, nullptr); + s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); std::vector column_families; cf_options_.table_factory = mock_table_factory_; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bc753223c..ab277cdbf 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -311,6 +311,11 @@ Status DBImpl::ResumeImpl() { s = bg_error; } + // Make sure the IO Status stored in version set is set to OK. + if(s.ok()) { + versions_->SetIOStatusOK(); + } + // We cannot guarantee consistency of the WAL. So force flush Memtables of // all the column families if (s.ok()) { @@ -1146,25 +1151,25 @@ int DBImpl::FindMinimumEmptyLevelFitting( Status DBImpl::FlushWAL(bool sync) { if (manual_wal_flush_) { - Status s; + IOStatus io_s; { // We need to lock log_write_mutex_ since logs_ might change concurrently InstrumentedMutexLock wl(&log_write_mutex_); log::Writer* cur_log_writer = logs_.back().writer; - s = cur_log_writer->WriteBuffer(); + io_s = cur_log_writer->WriteBuffer(); } - if (!s.ok()) { + if (!io_s.ok()) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", - s.ToString().c_str()); + io_s.ToString().c_str()); // In case there is a fs error we should set it globally to prevent the // future writes - WriteStatusCheck(s); + IOStatusCheck(io_s); // whether sync or not, we should abort the rest of function upon error - return s; + return std::move(io_s); } if (!sync) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); - return s; + return std::move(io_s); } } if (!sync) { @@ -1216,12 +1221,21 @@ Status DBImpl::SyncWAL() { TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); RecordTick(stats_, WAL_FILE_SYNCED); Status status; + IOStatus io_s; for (log::Writer* log : logs_to_sync) { - status = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync); - if (!status.ok()) { + io_s = log->file()->SyncWithoutFlush(immutable_db_options_.use_fsync); + if (!io_s.ok()) { + status = io_s; break; } } + if (!io_s.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL Sync error %s", + io_s.ToString().c_str()); + // In case there is a fs error we should set it globally to prevent the + // future writes + IOStatusCheck(io_s); + } if (status.ok() && need_log_dir_sync) { status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } @@ -1248,7 +1262,7 @@ Status DBImpl::LockWAL() { // future writes WriteStatusCheck(status); } - return status; + return std::move(status); } Status DBImpl::UnlockWAL() { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 2a057a873..167bc81e9 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -1364,7 +1364,7 @@ class DBImpl : public DB { void ReleaseFileNumberFromPendingOutputs( std::unique_ptr::iterator>& v); - Status SyncClosedLogs(JobContext* job_context); + IOStatus SyncClosedLogs(JobContext* job_context); // Flush the in-memory write buffer to storage. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Then @@ -1501,21 +1501,25 @@ class DBImpl : public DB { WriteBatch* tmp_batch, size_t* write_with_wal, WriteBatch** to_be_cached_state); - Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, - uint64_t* log_used, uint64_t* log_size); + IOStatus WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer, + uint64_t* log_used, uint64_t* log_size); - Status WriteToWAL(const WriteThread::WriteGroup& write_group, - log::Writer* log_writer, uint64_t* log_used, - bool need_log_sync, bool need_log_dir_sync, - SequenceNumber sequence); + IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence); - Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, - uint64_t* log_used, SequenceNumber* last_sequence, - size_t seq_inc); + IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, + uint64_t* log_used, + SequenceNumber* last_sequence, size_t seq_inc); // Used by WriteImpl to update bg_error_ if paranoid check is enabled. void WriteStatusCheck(const Status& status); + // Used by WriteImpl to update bg_error_ when IO error happens, e.g., write + // WAL, sync WAL fails, if paranoid check is enabled. + void IOStatusCheck(const IOStatus& status); + // Used by WriteImpl to update bg_error_ in case of memtable insert error. void MemTableInsertStatusCheck(const Status& memtable_insert_status); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index ac9a97a75..8cb0febe9 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -79,7 +79,7 @@ bool DBImpl::RequestCompactionToken(ColumnFamilyData* cfd, bool force, return false; } -Status DBImpl::SyncClosedLogs(JobContext* job_context) { +IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Start"); mutex_.AssertHeld(); autovector logs_to_sync; @@ -96,7 +96,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { logs_to_sync.push_back(log.writer); } - Status s; + IOStatus io_s; if (!logs_to_sync.empty()) { mutex_.Unlock(); @@ -104,34 +104,34 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { ROCKS_LOG_INFO(immutable_db_options_.info_log, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, log->get_log_number()); - s = log->file()->Sync(immutable_db_options_.use_fsync); - if (!s.ok()) { + io_s = log->file()->Sync(immutable_db_options_.use_fsync); + if (!io_s.ok()) { break; } if (immutable_db_options_.recycle_log_file_num > 0) { - s = log->Close(); - if (!s.ok()) { + io_s = log->Close(); + if (!io_s.ok()) { break; } } } - if (s.ok()) { - s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); + if (io_s.ok()) { + io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } mutex_.Lock(); // "number <= current_log_number - 1" is equivalent to // "number < current_log_number". - MarkLogsSynced(current_log_number - 1, true, s); - if (!s.ok()) { - error_handler_.SetBGError(s, BackgroundErrorReason::kFlush); + MarkLogsSynced(current_log_number - 1, true, io_s); + if (!io_s.ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed"); - return s; + return io_s; } } - return s; + return io_s; } Status DBImpl::FlushMemTableToOutputFile( @@ -155,7 +155,6 @@ Status DBImpl::FlushMemTableToOutputFile( GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, true /* sync_output_directory */, true /* write_manifest */, thread_pri); - FileMetaData file_meta; TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"); @@ -168,6 +167,7 @@ Status DBImpl::FlushMemTableToOutputFile( #endif // ROCKSDB_LITE Status s; + IOStatus io_s; if (logfile_number_ > 0 && versions_->GetColumnFamilySet()->NumberOfColumnFamilies() > 1) { // If there are more than one column families, we need to make sure that @@ -176,7 +176,8 @@ Status DBImpl::FlushMemTableToOutputFile( // flushed SST may contain data from write batches whose updates to // other column families are missing. // SyncClosedLogs() may unlock and re-lock the db_mutex. - s = SyncClosedLogs(job_context); + io_s = SyncClosedLogs(job_context); + s = io_s; } else { TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip"); } @@ -192,6 +193,7 @@ Status DBImpl::FlushMemTableToOutputFile( } else { flush_job.Cancel(); } + io_s = flush_job.io_status(); if (s.ok()) { InstallSuperVersionAndScheduleWork(cfd, superversion_context, @@ -206,8 +208,12 @@ Status DBImpl::FlushMemTableToOutputFile( } if (!s.ok() && !s.IsShutdownInProgress() && !s.IsColumnFamilyDropped()) { - Status new_bg_error = s; - error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + if (!io_s.ok()&& !io_s.IsShutdownInProgress() && !io_s.IsColumnFamilyDropped()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + } else { + Status new_bg_error = s; + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + } } if (s.ok()) { #ifndef ROCKSDB_LITE @@ -344,6 +350,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( std::vector file_meta(num_cfs); Status s; + IOStatus io_s; assert(num_cfs == static_cast(jobs.size())); #ifndef ROCKSDB_LITE @@ -358,15 +365,18 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( if (logfile_number_ > 0) { // TODO (yanqin) investigate whether we should sync the closed logs for // single column family case. - s = SyncClosedLogs(job_context); + io_s = SyncClosedLogs(job_context); + s = io_s; } // exec_status stores the execution status of flush_jobs as // autovector> exec_status; + autovector io_status; for (int i = 0; i != num_cfs; ++i) { // Initially all jobs are not executed, with status OK. exec_status.emplace_back(false, Status::OK()); + io_status.emplace_back(IOStatus::OK()); } if (s.ok()) { @@ -375,6 +385,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( exec_status[i].second = jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]); exec_status[i].first = true; + io_status[i] = jobs[i]->io_status(); } if (num_cfs > 1) { TEST_SYNC_POINT( @@ -387,6 +398,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( exec_status[0].second = jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); exec_status[0].first = true; + io_status[0] = jobs[0]->io_status(); Status error_status; for (const auto& e : exec_status) { @@ -405,6 +417,20 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( s = error_status.ok() ? s : error_status; } + if (io_s.ok()) { + IOStatus io_error = IOStatus::OK(); + for (int i = 0; i != static_cast(io_status.size()); i++) { + if (!io_status[i].ok() && !io_status[i].IsShutdownInProgress() && + !io_status[i].IsColumnFamilyDropped()) { + io_error = io_status[i]; + } + } + io_s = io_error; + if (s.ok() && !io_s.ok()) { + s = io_s; + } + } + if (s.IsColumnFamilyDropped()) { s = Status::OK(); } @@ -543,9 +569,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #endif // ROCKSDB_LITE } - if (!s.ok() && !s.IsShutdownInProgress()) { - Status new_bg_error = s; - error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + // Need to undo atomic flush if something went wrong, i.e. s is not OK and + // it is not because of CF drop. + if (!s.ok() && !s.IsColumnFamilyDropped()) { + if (!io_s.ok() && io_s.IsColumnFamilyDropped()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush); + } else { + Status new_bg_error = s; + error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush); + } } return s; @@ -2633,6 +2665,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } + IOStatus io_s; if (!c) { // Nothing to do ROCKS_LOG_BUFFER(log_buffer, "Compaction nothing to do"); @@ -2654,9 +2687,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } + versions_->SetIOStatusOK(); status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); + io_s = versions_->io_status(); InstallSuperVersionAndScheduleWork(c->column_family_data(), &job_context->superversion_contexts[0], *c->mutable_cf_options()); @@ -2710,9 +2745,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } } + versions_->SetIOStatusOK(); status = versions_->LogAndApply(c->column_family_data(), *c->mutable_cf_options(), c->edit(), &mutex_, directories_.GetDbDir()); + io_s = versions_->io_status(); // Use latest MutableCFOptions InstallSuperVersionAndScheduleWork(c->column_family_data(), &job_context->superversion_contexts[0], @@ -2799,6 +2836,7 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, mutex_.Lock(); status = compaction_job.Install(*c->mutable_cf_options()); + io_s = compaction_job.io_status(); if (status.ok()) { InstallSuperVersionAndScheduleWork(c->column_family_data(), &job_context->superversion_contexts[0], @@ -2808,6 +2846,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:AfterCompaction", c->column_family_data()); } + + if (status.ok() && !io_s.ok()) { + status = io_s; + } + if (c != nullptr) { c->ReleaseCompactionFiles(status); *made_progress = true; @@ -2833,7 +2876,11 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, } else { ROCKS_LOG_WARN(immutable_db_options_.info_log, "Compaction error: %s", status.ToString().c_str()); - error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + if (!io_s.ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction); + } else { + error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction); + } if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) { // Put this cfd back in the compaction queue so we can retry after some // time diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index a1ff1ed8e..a689330e2 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -292,7 +292,7 @@ Status DBImpl::NewDB() { } if (s.ok()) { // Make "CURRENT" file that points to the new manifest file. - s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir()); + s = SetCurrentFile(fs_.get(), dbname_, 1, directories_.GetDbDir()); } else { fs_->DeleteFile(manifest, IOOptions(), nullptr); } @@ -1239,6 +1239,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, if (range_del_iter != nullptr) { range_del_iters.emplace_back(range_del_iter); } + IOStatus io_s; s = BuildTable( dbname_, env_, fs_.get(), *cfd->ioptions(), mutable_cf_options, file_options_for_compaction_, cfd->table_cache(), iter.get(), @@ -1248,7 +1249,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), mutable_cf_options.sample_for_compression, cfd->ioptions()->compression_opts, paranoid_file_checks, - cfd->internal_stats(), TableFileCreationReason::kRecovery, + cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s, &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */, -1 /* level */, current_time, write_hint); LogFlush(immutable_db_options_.info_log); diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 20a81ff84..faf552d25 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -101,6 +101,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, disable_memtable); Status status; + IOStatus io_s; if (write_options.low_pri) { status = ThrottleLowPriWritesIfNeeded(write_options, my_batch); if (!status.ok()) { @@ -322,21 +323,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, if (!two_write_queues_) { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); - status = WriteToWAL(write_group, log_writer, log_used, need_log_sync, - need_log_dir_sync, last_sequence + 1); + io_s = WriteToWAL(write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, last_sequence + 1); } } else { if (status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); // LastAllocatedSequence is increased inside WriteToWAL under // wal_write_mutex_ to ensure ordered events in WAL - status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, - seq_inc); + io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, + seq_inc); } else { // Otherwise we inc seq number for memtable writes last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); } } + status = io_s; assert(last_sequence != kMaxSequenceNumber); const SequenceNumber current_sequence = last_sequence + 1; last_sequence += seq_inc; @@ -411,7 +413,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { - WriteStatusCheck(status); + if (!io_s.ok()) { + IOStatusCheck(io_s); + } else { + WriteStatusCheck(status); + } } if (need_log_sync) { @@ -515,6 +521,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); + IOStatus io_s; if (w.status.ok() && !write_options.disableWAL) { PERF_TIMER_GUARD(write_wal_time); stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); @@ -524,12 +531,17 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); } - w.status = WriteToWAL(wal_write_group, log_writer, log_used, - need_log_sync, need_log_dir_sync, current_sequence); + io_s = WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync, + need_log_dir_sync, current_sequence); + w.status = io_s; } if (!w.CallbackFailed()) { - WriteStatusCheck(w.status); + if (!io_s.ok()) { + IOStatusCheck(io_s); + } else { + WriteStatusCheck(w.status); + } } if (need_log_sync) { @@ -740,9 +752,10 @@ Status DBImpl::WriteImplWALOnly( } seq_inc = total_batch_cnt; } + IOStatus io_s; if (!write_options.disableWAL) { - status = - ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); + io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); + status = io_s; } else { // Otherwise we inc seq number to do solely the seq allocation last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); @@ -777,7 +790,11 @@ Status DBImpl::WriteImplWALOnly( PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { - WriteStatusCheck(status); + if (!io_s.ok()) { + IOStatusCheck(io_s); + } else { + WriteStatusCheck(status); + } } if (status.ok()) { size_t index = 0; @@ -823,6 +840,17 @@ void DBImpl::WriteStatusCheck(const Status& status) { } } +void DBImpl::IOStatusCheck(const IOStatus& io_status) { + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if (immutable_db_options_.paranoid_checks && !io_status.ok() && + !io_status.IsBusy() && !io_status.IsIncomplete()) { + mutex_.Lock(); + error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); + mutex_.Unlock(); + } +} + void DBImpl::MemTableInsertStatusCheck(const Status& status) { // A non-OK status here indicates that the state implied by the // WAL has diverged from the in-memory state. This could be @@ -961,9 +989,9 @@ WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, // When two_write_queues_ is disabled, this function is called from the only // write thread. Otherwise this must be called holding log_write_mutex_. -Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, - log::Writer* log_writer, uint64_t* log_used, - uint64_t* log_size) { +IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, + log::Writer* log_writer, uint64_t* log_used, + uint64_t* log_size) { assert(log_size != nullptr); Slice log_entry = WriteBatchInternal::Contents(&merged_batch); *log_size = log_entry.size(); @@ -978,7 +1006,8 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, if (UNLIKELY(needs_locking)) { log_write_mutex_.Lock(); } - Status status = log_writer->AddRecord(log_entry); + IOStatus io_s = log_writer->AddRecord(log_entry); + if (UNLIKELY(needs_locking)) { log_write_mutex_.Unlock(); } @@ -990,15 +1019,14 @@ Status DBImpl::WriteToWAL(const WriteBatch& merged_batch, // since alive_log_files_ might be modified concurrently alive_log_files_.back().AddSize(log_entry.size()); log_empty_ = false; - return status; + return io_s; } -Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, - log::Writer* log_writer, uint64_t* log_used, - bool need_log_sync, bool need_log_dir_sync, - SequenceNumber sequence) { - Status status; - +IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence) { + IOStatus io_s; assert(!write_group.leader->disable_wal); // Same holds for all in the batch group size_t write_with_wal = 0; @@ -1016,13 +1044,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, WriteBatchInternal::SetSequence(merged_batch, sequence); uint64_t log_size; - status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; } - if (status.ok() && need_log_sync) { + if (io_s.ok() && need_log_sync) { StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); // It's safe to access logs_ with unlocked mutex_ here because: // - we've set getting_synced=true for all logs, @@ -1032,23 +1060,24 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, // - as long as other threads don't modify it, it's safe to read // from std::deque from multiple threads concurrently. for (auto& log : logs_) { - status = log.writer->file()->Sync(immutable_db_options_.use_fsync); - if (!status.ok()) { + io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync); + if (!io_s.ok()) { break; } } - if (status.ok() && need_log_dir_sync) { + + if (io_s.ok() && need_log_dir_sync) { // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. - status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); + io_s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } } if (merged_batch == &tmp_batch_) { tmp_batch_.Clear(); } - if (status.ok()) { + if (io_s.ok()) { auto stats = default_cf_internal_stats_; if (need_log_sync) { stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1); @@ -1059,14 +1088,13 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); } - return status; + return io_s; } -Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, - uint64_t* log_used, - SequenceNumber* last_sequence, - size_t seq_inc) { - Status status; +IOStatus DBImpl::ConcurrentWriteToWAL( + const WriteThread::WriteGroup& write_group, uint64_t* log_used, + SequenceNumber* last_sequence, size_t seq_inc) { + IOStatus io_s; assert(!write_group.leader->disable_wal); // Same holds for all in the batch group @@ -1092,14 +1120,14 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, log::Writer* log_writer = logs_.back().writer; uint64_t log_size; - status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size); if (to_be_cached_state) { cached_recoverable_state_ = *to_be_cached_state; cached_recoverable_state_empty_ = false; } log_write_mutex_.Unlock(); - if (status.ok()) { + if (io_s.ok()) { const bool concurrent = true; auto stats = default_cf_internal_stats_; stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size, @@ -1109,7 +1137,7 @@ Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, concurrent); RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); } - return status; + return io_s; } Status DBImpl::WriteRecoverableState() { diff --git a/db/error_handler.cc b/db/error_handler.cc index 3ba4d9fd9..cbee2b811 100644 --- a/db/error_handler.cc +++ b/db/error_handler.cc @@ -238,6 +238,46 @@ Status ErrorHandler::SetBGError(const Status& bg_err, BackgroundErrorReason reas return bg_error_; } +Status ErrorHandler::SetBGError(const IOStatus& bg_io_err, + BackgroundErrorReason reason) { + db_mutex_->AssertHeld(); + if (bg_io_err.ok()) { + return Status::OK(); + } + if (recovery_in_prog_ && recovery_error_.ok()) { + recovery_error_ = bg_io_err; + } + Status new_bg_io_err = bg_io_err; + Status s; + if (bg_io_err.GetDataLoss()) { + // FIrst, data loss is treated as unrecoverable error. So it can directly + // overwrite any existing bg_error_. + bool auto_recovery = false; + Status bg_err(new_bg_io_err, Status::Severity::kUnrecoverableError); + bg_error_ = bg_err; + EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, + db_mutex_, &auto_recovery); + return bg_error_; + } else if (bg_io_err.GetRetryable()) { + // Second, check if the error is a retryable IO error or not. if it is + // retryable error and its severity is higher than bg_error_, overwrite + // the bg_error_ with new error. + // In current stage, treat retryable error as HardError. No automatic + // recovery. + bool auto_recovery = false; + Status bg_err(new_bg_io_err, Status::Severity::kHardError); + EventHelpers::NotifyOnBackgroundError(db_options_.listeners, reason, &s, + db_mutex_, &auto_recovery); + if (bg_err.severity() > bg_error_.severity()) { + bg_error_ = bg_err; + } + return bg_error_; + } else { + s = SetBGError(new_bg_io_err, reason); + } + return s; +} + Status ErrorHandler::OverrideNoSpaceError(Status bg_error, bool* auto_recovery) { #ifndef ROCKSDB_LITE diff --git a/db/error_handler.h b/db/error_handler.h index 7276f6510..535ed675f 100644 --- a/db/error_handler.h +++ b/db/error_handler.h @@ -6,6 +6,7 @@ #include "monitoring/instrumented_mutex.h" #include "options/db_options.h" +#include "rocksdb/io_status.h" #include "rocksdb/listener.h" #include "rocksdb/status.h" @@ -34,6 +35,8 @@ class ErrorHandler { Status SetBGError(const Status& bg_err, BackgroundErrorReason reason); + Status SetBGError(const IOStatus& bg_io_err, BackgroundErrorReason reason); + Status GetBGError() { return bg_error_; } Status GetRecoveryError() { return recovery_error_; } diff --git a/db/error_handler_fs_test.cc b/db/error_handler_fs_test.cc index 9dbab3f1e..b32fec84b 100644 --- a/db/error_handler_fs_test.cc +++ b/db/error_handler_fs_test.cc @@ -181,6 +181,69 @@ TEST_F(DBErrorHandlingFSTest, FLushWriteError) { Destroy(options); } +TEST_F(DBErrorHandlingFSTest, FLushWritRetryableeError) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + Status s; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(1), "val1"); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeFinishBuildTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Reopen(options); + ASSERT_EQ("val1", Get(Key(1))); + + Put(Key(2), "val2"); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeSyncTable", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Reopen(options); + ASSERT_EQ("val2", Get(Key(2))); + + Put(Key(3), "val3"); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:BeforeCloseTableFile", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Reopen(options); + ASSERT_EQ("val3", Get(Key(3))); + + Destroy(options); +} + TEST_F(DBErrorHandlingFSTest, ManifestWriteError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); @@ -224,6 +287,51 @@ TEST_F(DBErrorHandlingFSTest, ManifestWriteError) { Close(); } +TEST_F(DBErrorHandlingFSTest, ManifestWriteRetryableError) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.listeners.emplace_back(listener); + Status s; + std::string old_manifest; + std::string new_manifest; + + listener->EnableAutoRecovery(false); + DestroyAndReopen(options); + old_manifest = GetManifestNameFromLiveFiles(); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(0), "val"); + Flush(); + Put(Key(1), "val"); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::LogAndApply:WriteManifest", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + SyncPoint::GetInstance()->EnableProcessing(); + s = Flush(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + fault_fs->SetFilesystemActive(true); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + + new_manifest = GetManifestNameFromLiveFiles(); + ASSERT_NE(new_manifest, old_manifest); + + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); + ASSERT_EQ("val", Get(Key(1))); + Close(); +} + TEST_F(DBErrorHandlingFSTest, DoubleManifestWriteError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); @@ -347,6 +455,61 @@ TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteError) { Close(); } +TEST_F(DBErrorHandlingFSTest, CompactionManifestWriteRetryableError) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); + Status s; + std::string old_manifest; + std::string new_manifest; + DestroyAndReopen(options); + old_manifest = GetManifestNameFromLiveFiles(); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(0), "val"); + Put(Key(2), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError)); + listener->EnableAutoRecovery(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + + fault_fs->SetFilesystemActive(true); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + + new_manifest = GetManifestNameFromLiveFiles(); + ASSERT_NE(new_manifest, old_manifest); + + Reopen(options); + ASSERT_EQ("val", Get(Key(0))); + ASSERT_EQ("val", Get(Key(1))); + ASSERT_EQ("val", Get(Key(2))); + Close(); +} + TEST_F(DBErrorHandlingFSTest, CompactionWriteError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); @@ -391,6 +554,53 @@ TEST_F(DBErrorHandlingFSTest, CompactionWriteError) { Destroy(options); } +TEST_F(DBErrorHandlingFSTest, CompactionWriteRetryableError) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.level0_file_num_compaction_trigger = 2; + options.listeners.emplace_back(listener); + Status s; + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + Put(Key(0), "va;"); + Put(Key(2), "va;"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + listener->OverrideBGError(Status(error_msg, Status::Severity::kHardError)); + listener->EnableAutoRecovery(false); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:FlushMemTableFinished", + "BackgroundCallCompaction:0"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( + "BackgroundCallCompaction:0", + [&](void*) { fault_fs->SetFilesystemActive(false, error_msg); }); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + Put(Key(1), "val"); + s = Flush(); + ASSERT_EQ(s, Status::OK()); + + s = dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kHardError); + + fault_fs->SetFilesystemActive(true); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + Destroy(options); +} + TEST_F(DBErrorHandlingFSTest, CorruptionError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); @@ -566,6 +776,101 @@ TEST_F(DBErrorHandlingFSTest, WALWriteError) { Close(); } +TEST_F(DBErrorHandlingFSTest, WALWriteRetryableError) { + std::shared_ptr fault_fs( + new FaultInjectionTestFS(FileSystem::Default())); + std::unique_ptr fault_fs_env(NewCompositeEnv(fault_fs)); + std::shared_ptr listener( + new ErrorHandlerFSListener()); + Options options = GetDefaultOptions(); + options.env = fault_fs_env.get(); + options.create_if_missing = true; + options.writable_file_max_buffer_size = 32768; + options.listeners.emplace_back(listener); + options.paranoid_checks = true; + Status s; + Random rnd(301); + + DestroyAndReopen(options); + + IOStatus error_msg = IOStatus::IOError("Retryable IO Error"); + error_msg.SetRetryable(true); + + // For the first batch, write is successful, require sync + { + WriteBatch batch; + + for (auto i = 0; i < 100; ++i) { + batch.Put(Key(i), RandomString(&rnd, 1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + // For the second batch, the first 2 file Append are successful, then the + // following Append fails due to file system retryable IOError. + { + WriteBatch batch; + int write_error = 0; + + for (auto i = 100; i < 200; ++i) { + batch.Put(Key(i), RandomString(&rnd, 1024)); + } + + SyncPoint::GetInstance()->SetCallBack( + "WritableFileWriter::Append:BeforePrepareWrite", [&](void*) { + write_error++; + if (write_error > 2) { + fault_fs->SetFilesystemActive(false, error_msg); + } + }); + SyncPoint::GetInstance()->EnableProcessing(); + WriteOptions wopts; + wopts.sync = true; + s = dbfull()->Write(wopts, &batch); + ASSERT_EQ(true, s.IsIOError()); + } + fault_fs->SetFilesystemActive(true); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->DisableProcessing(); + + // Data in corrupted WAL are not stored + for (auto i = 0; i < 199; ++i) { + if (i < 100) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + + // Resume and write a new batch, should be in the WAL + s = dbfull()->Resume(); + ASSERT_EQ(s, Status::OK()); + { + WriteBatch batch; + + for (auto i = 200; i < 300; ++i) { + batch.Put(Key(i), RandomString(&rnd, 1024)); + } + + WriteOptions wopts; + wopts.sync = true; + ASSERT_EQ(dbfull()->Write(wopts, &batch), Status::OK()); + }; + + Reopen(options); + for (auto i = 0; i < 300; ++i) { + if (i < 100 || i >= 200) { + ASSERT_NE(Get(Key(i)), "NOT_FOUND"); + } else { + ASSERT_EQ(Get(Key(i)), "NOT_FOUND"); + } + } + Close(); +} + TEST_F(DBErrorHandlingFSTest, MultiCFWALWriteError) { std::shared_ptr fault_fs( new FaultInjectionTestFS(FileSystem::Default())); diff --git a/db/flush_job.cc b/db/flush_job.cc index 8d0eeb7ca..c45e33b30 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -238,10 +238,14 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, } else if (write_manifest_) { TEST_SYNC_POINT("FlushJob::InstallResults"); // Replace immutable memtable with the generated Table + IOStatus tmp_io_s; s = cfd_->imm()->TryInstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_, &committed_flush_jobs_info_); + log_buffer_, &committed_flush_jobs_info_, &tmp_io_s); + if (!tmp_io_s.ok()) { + io_status_ = tmp_io_s; + } } if (s.ok() && file_meta != nullptr) { @@ -371,6 +375,7 @@ Status FlushJob::WriteLevel0Table() { meta_.oldest_ancester_time = std::min(current_time, oldest_key_time); meta_.file_creation_time = current_time; + IOStatus io_s; s = BuildTable( dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(), mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(), @@ -381,10 +386,13 @@ Status FlushJob::WriteLevel0Table() { output_compression_, mutable_cf_options_.sample_for_compression, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), - TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */, + TableFileCreationReason::kFlush, &io_s, event_logger_, + job_context_->job_id, Env::IO_HIGH, &table_properties_, 0 /* level */, meta_.oldest_ancester_time, oldest_key_time, write_hint, current_time); + if (!io_s.ok()) { + io_status_ = io_s; + } LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/flush_job.h b/db/flush_job.h index 3c80390a4..ec3a0d981 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -90,6 +90,9 @@ class FlushJob { } #endif // !ROCKSDB_LITE + // Return the IO status + IOStatus io_status() const { return io_status_; } + private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); @@ -154,6 +157,7 @@ class FlushJob { Version* base_; bool pick_memtable_called; Env::Priority thread_pri_; + IOStatus io_status_; }; } // namespace ROCKSDB_NAMESPACE diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 319330e70..cee6d61e4 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -108,7 +108,7 @@ class FlushJobTest : public testing::Test { } ASSERT_OK(s); // Make "CURRENT" file that points to the new manifest file. - s = SetCurrentFile(env_, dbname_, 1, nullptr); + s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); } Env* env_; diff --git a/db/log_writer.cc b/db/log_writer.cc index 0222ee2a7..04d3f64cc 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -37,10 +37,10 @@ Writer::~Writer() { } } -Status Writer::WriteBuffer() { return dest_->Flush(); } +IOStatus Writer::WriteBuffer() { return dest_->Flush(); } -Status Writer::Close() { - Status s; +IOStatus Writer::Close() { + IOStatus s; if (dest_) { s = dest_->Close(); dest_.reset(); @@ -48,7 +48,7 @@ Status Writer::Close() { return s; } -Status Writer::AddRecord(const Slice& slice) { +IOStatus Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); @@ -59,7 +59,7 @@ Status Writer::AddRecord(const Slice& slice) { // Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record - Status s; + IOStatus s; bool begin = true; do { const int64_t leftover = kBlockSize - block_offset_; @@ -114,7 +114,7 @@ Status Writer::AddRecord(const Slice& slice) { bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } -Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { +IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes size_t header_size; @@ -150,7 +150,7 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { EncodeFixed32(buf, crc); // Write the header and the payload - Status s = dest_->Append(Slice(buf, header_size)); + IOStatus s = dest_->Append(Slice(buf, header_size)); if (s.ok()) { s = dest_->Append(Slice(ptr, n)); } diff --git a/db/log_writer.h b/db/log_writer.h index a7f952edd..463826e88 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -13,6 +13,7 @@ #include #include "db/log_format.h" +#include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -79,16 +80,16 @@ class Writer { ~Writer(); - Status AddRecord(const Slice& slice); + IOStatus AddRecord(const Slice& slice); WritableFileWriter* file() { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); } uint64_t get_log_number() const { return log_number_; } - Status WriteBuffer(); + IOStatus WriteBuffer(); - Status Close(); + IOStatus Close(); bool TEST_BufferIsEmpty(); @@ -103,7 +104,7 @@ class Writer { // record type stored in the header. uint32_t type_crc_[kMaxRecordType + 1]; - Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); + IOStatus EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() diff --git a/db/memtable_list.cc b/db/memtable_list.cc index a1f4d5625..1a23d18cd 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -390,7 +390,8 @@ Status MemTableList::TryInstallMemtableFlushResults( VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, - std::list>* committed_flush_jobs_info) { + std::list>* committed_flush_jobs_info, + IOStatus* io_s) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -471,8 +472,10 @@ Status MemTableList::TryInstallMemtableFlushResults( } // this can release and reacquire the mutex. + vset->SetIOStatusOK(); s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, db_directory); + *io_s = vset->io_status(); // we will be changing the version in the next code path, // so we better create a new one, since versions are immutable diff --git a/db/memtable_list.h b/db/memtable_list.h index 5c00bbf43..bf6750bf2 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -266,7 +266,8 @@ class MemTableList { VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, - std::list>* committed_flush_jobs_info); + std::list>* committed_flush_jobs_info, + IOStatus* io_s); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 8981c12ba..6e0d33651 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -114,13 +114,14 @@ class MemTableListTest : public testing::Test { auto cfd = column_family_set->GetDefault(); EXPECT_TRUE(nullptr != cfd); uint64_t file_num = file_number.fetch_add(1); + IOStatus io_s; // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); std::list> flush_jobs_info; Status s = list->TryInstallMemtableFlushResults( cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, - file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info); + file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s); return s; } diff --git a/db/repair.cc b/db/repair.cc index b2df33f06..5155be985 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -424,6 +424,7 @@ class Repairer { } LegacyFileSystemWrapper fs(env_); + IOStatus io_s; status = BuildTable( dbname_, env_, &fs, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), env_options_, table_cache_, @@ -432,7 +433,7 @@ class Repairer { cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, snapshot_checker, kNoCompression, 0 /* sample_for_compression */, CompressionOptions(), false, nullptr /* internal_stats */, - TableFileCreationReason::kRecovery, nullptr /* event_logger */, + TableFileCreationReason::kRecovery, &io_s, nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */, -1 /* level */, current_time, write_hint); ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/version_set.cc b/db/version_set.cc index 8e875cc2a..6eda94bbe 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3796,6 +3796,7 @@ Status VersionSet::ProcessManifestWrites( uint64_t new_manifest_file_size = 0; Status s; + IOStatus io_s; assert(pending_manifest_file_number_ == 0); if (!descriptor_log_ || @@ -3906,15 +3907,19 @@ Status VersionSet::ProcessManifestWrites( } ++idx; #endif /* !NDEBUG */ - s = descriptor_log_->AddRecord(record); - if (!s.ok()) { + io_s = descriptor_log_->AddRecord(record); + if (!io_s.ok()) { + io_status_ = io_s; + s = io_s; break; } } if (s.ok()) { - s = SyncManifest(env_, db_options_, descriptor_log_->file()); + io_s = SyncManifest(env_, db_options_, descriptor_log_->file()); } - if (!s.ok()) { + if (!io_s.ok()) { + io_status_ = io_s; + s = io_s; ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", s.ToString().c_str()); } @@ -3923,8 +3928,12 @@ Status VersionSet::ProcessManifestWrites( // If we just created a new descriptor file, install it by writing a // new CURRENT file that points to it. if (s.ok() && new_descriptor_log) { - s = SetCurrentFile(env_, dbname_, pending_manifest_file_number_, - db_directory); + io_s = SetCurrentFile(fs_, dbname_, pending_manifest_file_number_, + db_directory); + if (!io_s.ok()) { + io_status_ = io_s; + s = io_s; + } TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest"); } @@ -5194,9 +5203,10 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption("Unable to Encode VersionEdit:" + edit_for_db_id.DebugString(true)); } - Status add_record = log->AddRecord(db_id_record); - if (!add_record.ok()) { - return add_record; + IOStatus io_s = log->AddRecord(db_id_record); + if (!io_s.ok()) { + io_status_ = io_s; + return std::move(io_s); } } @@ -5221,9 +5231,10 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption( "Unable to Encode VersionEdit:" + edit.DebugString(true)); } - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; + IOStatus io_s = log->AddRecord(record); + if (!io_s.ok()) { + io_status_ = io_s; + return std::move(io_s); } } @@ -5252,9 +5263,10 @@ Status VersionSet::WriteCurrentStateToManifest( return Status::Corruption( "Unable to Encode VersionEdit:" + edit.DebugString(true)); } - Status s = log->AddRecord(record); - if (!s.ok()) { - return s; + IOStatus io_s = log->AddRecord(record); + if (!io_s.ok()) { + io_status_ = io_s; + return std::move(io_s); } } } diff --git a/db/version_set.h b/db/version_set.h index 8070f867d..8e29a6d94 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1075,6 +1075,12 @@ class VersionSet { static uint64_t GetTotalSstFilesSize(Version* dummy_versions); + // Get the IO Status returned by written Manifest. + IOStatus io_status() const { return io_status_; } + + // Set the IO Status to OK. Called before Manifest write if needed. + void SetIOStatusOK() { io_status_ = IOStatus::OK(); } + protected: struct ManifestWriter; @@ -1193,6 +1199,9 @@ class VersionSet { BlockCacheTracer* const block_cache_tracer_; + // Store the IO status when Manifest is written + IOStatus io_status_; + private: // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index e877273d2..85cd29275 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -744,7 +744,7 @@ class VersionSetTestBase { PrepareManifest(&column_families, &last_seqno, &log_writer); log_writer.reset(); // Make "CURRENT" file point to the new manifest file. - Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); ASSERT_OK(s); EXPECT_OK(versions_->Recover(column_families, false)); @@ -847,7 +847,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, edits_[i].MarkAtomicGroup(--remaining); edits_[i].SetLastSequence(last_seqno_++); } - ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); } void SetupIncompleteTrailingAtomicGroup(int atomic_group_size) { @@ -859,7 +859,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, edits_[i].MarkAtomicGroup(--remaining); edits_[i].SetLastSequence(last_seqno_++); } - ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); } void SetupCorruptedAtomicGroup(int atomic_group_size) { @@ -873,7 +873,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, } edits_[i].SetLastSequence(last_seqno_++); } - ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); } void SetupIncorrectAtomicGroup(int atomic_group_size) { @@ -889,7 +889,7 @@ class VersionSetAtomicGroupTest : public VersionSetTestBase, } edits_[i].SetLastSequence(last_seqno_++); } - ASSERT_OK(SetCurrentFile(env_, dbname_, 1, nullptr)); + ASSERT_OK(SetCurrentFile(fs_.get(), dbname_, 1, nullptr)); } void SetupTestSyncPoints() { @@ -1241,7 +1241,7 @@ TEST_P(VersionSetTestDropOneCF, HandleDroppedColumnFamilyInAtomicGroup) { SequenceNumber last_seqno; std::unique_ptr log_writer; PrepareManifest(&column_families, &last_seqno, &log_writer); - Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); ASSERT_OK(s); EXPECT_OK(versions_->Recover(column_families, false /* read_only */)); @@ -1378,7 +1378,7 @@ class EmptyDefaultCfNewManifest : public VersionSetTestBase, TEST_F(EmptyDefaultCfNewManifest, Recover) { PrepareManifest(nullptr, nullptr, &log_writer_); log_writer_.reset(); - Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; VerifyManifest(&manifest_path); @@ -1440,7 +1440,7 @@ TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest0) { db_options_.write_dbid_to_manifest = std::get<0>(GetParam()); PrepareManifest(nullptr, nullptr, &log_writer_); log_writer_.reset(); - Status s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; @@ -1483,7 +1483,7 @@ TEST_P(VersionSetTestEmptyDb, OpenFromIncompleteManifest1) { ASSERT_OK(s); } log_writer_.reset(); - s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; @@ -1529,7 +1529,7 @@ TEST_P(VersionSetTestEmptyDb, OpenFromInCompleteManifest2) { ASSERT_OK(s); } log_writer_.reset(); - s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; @@ -1586,7 +1586,7 @@ TEST_P(VersionSetTestEmptyDb, OpenManifestWithUnknownCF) { ASSERT_OK(s); } log_writer_.reset(); - s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; @@ -1642,7 +1642,7 @@ TEST_P(VersionSetTestEmptyDb, OpenCompleteManifest) { ASSERT_OK(s); } log_writer_.reset(); - s = SetCurrentFile(env_, dbname_, 1, /*directory_to_fsync=*/nullptr); + s = SetCurrentFile(fs_.get(), dbname_, 1, /*directory_to_fsync=*/nullptr); ASSERT_OK(s); std::string manifest_path; @@ -1901,7 +1901,7 @@ TEST_F(VersionSetTestMissingFiles, ManifestFarBehindSst) { WriteFileAdditionAndDeletionToManifest( /*cf=*/0, std::vector>(), deleted_files); log_writer_.reset(); - Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); ASSERT_OK(s); std::string manifest_path; VerifyManifest(&manifest_path); @@ -1952,7 +1952,7 @@ TEST_F(VersionSetTestMissingFiles, ManifestAheadofSst) { WriteFileAdditionAndDeletionToManifest( /*cf=*/0, added_files, std::vector>()); log_writer_.reset(); - Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); ASSERT_OK(s); std::string manifest_path; VerifyManifest(&manifest_path); @@ -2001,7 +2001,7 @@ TEST_F(VersionSetTestMissingFiles, NoFileMissing) { WriteFileAdditionAndDeletionToManifest( /*cf=*/0, std::vector>(), deleted_files); log_writer_.reset(); - Status s = SetCurrentFile(env_, dbname_, 1, nullptr); + Status s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr); ASSERT_OK(s); std::string manifest_path; VerifyManifest(&manifest_path); diff --git a/env/composite_env_wrapper.h b/env/composite_env_wrapper.h index 936d54f71..43f6ecdc5 100644 --- a/env/composite_env_wrapper.h +++ b/env/composite_env_wrapper.h @@ -20,22 +20,6 @@ namespace ROCKSDB_NAMESPACE { // Options::env only, whereas in the latter case, the user will specify // Options::env and Options::file_system. -inline IOStatus status_to_io_status(Status&& status) { - if (status.ok()) { - // Fast path - return IOStatus::OK(); - } else { - const char* state = status.getState(); - if (state) { - return IOStatus(status.code(), status.subcode(), - Slice(state, strlen(status.getState()) + 1), - Slice()); - } else { - return IOStatus(status.code(), status.subcode()); - } - } -} - class CompositeSequentialFileWrapper : public SequentialFile { public: explicit CompositeSequentialFileWrapper( diff --git a/env/env.cc b/env/env.cc index c6eef1448..509bd0566 100644 --- a/env/env.cc +++ b/env/env.cc @@ -369,20 +369,8 @@ void Log(const std::shared_ptr& info_log, const char* format, ...) { Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname, bool should_sync) { - std::unique_ptr file; - EnvOptions soptions; - Status s = env->NewWritableFile(fname, &file, soptions); - if (!s.ok()) { - return s; - } - s = file->Append(data); - if (s.ok() && should_sync) { - s = file->Sync(); - } - if (!s.ok()) { - env->DeleteFile(fname); - } - return s; + LegacyFileSystemWrapper lfsw(env); + return WriteStringToFile(&lfsw, data, fname, should_sync); } Status ReadFileToString(Env* env, const std::string& fname, std::string* data) { diff --git a/env/file_system.cc b/env/file_system.cc index 00840bc86..800e654ff 100644 --- a/env/file_system.cc +++ b/env/file_system.cc @@ -83,12 +83,31 @@ FileOptions FileSystem::OptimizeForCompactionTableRead( return optimized_file_options; } -Status ReadFileToString(FileSystem* fs, const std::string& fname, - std::string* data) { +IOStatus WriteStringToFile(FileSystem* fs, const Slice& data, + const std::string& fname, bool should_sync) { + std::unique_ptr file; + EnvOptions soptions; + IOStatus s = fs->NewWritableFile(fname, soptions, &file, nullptr); + if (!s.ok()) { + return s; + } + s = file->Append(data, IOOptions(), nullptr); + if (s.ok() && should_sync) { + s = file->Sync(IOOptions(), nullptr); + } + if (!s.ok()) { + fs->DeleteFile(fname, IOOptions(), nullptr); + } + return s; +} + +IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, + std::string* data) { FileOptions soptions; data->clear(); std::unique_ptr file; - Status s = fs->NewSequentialFile(fname, soptions, &file, nullptr); + IOStatus s = status_to_io_status( + fs->NewSequentialFile(fname, soptions, &file, nullptr)); if (!s.ok()) { return s; } diff --git a/file/filename.cc b/file/filename.cc index 07cc08369..2ff0e30c6 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -368,7 +368,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number, return true; } -Status SetCurrentFile(Env* env, const std::string& dbname, +IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, uint64_t descriptor_number, FSDirectory* directory_to_fsync) { // Remove leading "dbname/" and add newline to manifest file name @@ -377,10 +377,10 @@ Status SetCurrentFile(Env* env, const std::string& dbname, assert(contents.starts_with(dbname + "/")); contents.remove_prefix(dbname.size() + 1); std::string tmp = TempFileName(dbname, descriptor_number); - Status s = WriteStringToFile(env, contents.ToString() + "\n", tmp, true); + IOStatus s = WriteStringToFile(fs, contents.ToString() + "\n", tmp, true); if (s.ok()) { TEST_KILL_RANDOM("SetCurrentFile:0", rocksdb_kill_odds * REDUCE_ODDS2); - s = env->RenameFile(tmp, CurrentFileName(dbname)); + s = fs->RenameFile(tmp, CurrentFileName(dbname), IOOptions(), nullptr); TEST_KILL_RANDOM("SetCurrentFile:1", rocksdb_kill_odds * REDUCE_ODDS2); } if (s.ok()) { @@ -388,7 +388,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname, s = directory_to_fsync->Fsync(IOOptions(), nullptr); } } else { - env->DeleteFile(tmp); + fs->DeleteFile(tmp, IOOptions(), nullptr); } return s; } @@ -414,8 +414,8 @@ Status SetIdentityFile(Env* env, const std::string& dbname, return s; } -Status SyncManifest(Env* env, const ImmutableDBOptions* db_options, - WritableFileWriter* file) { +IOStatus SyncManifest(Env* env, const ImmutableDBOptions* db_options, + WritableFileWriter* file) { TEST_KILL_RANDOM("SyncManifest:0", rocksdb_kill_odds * REDUCE_ODDS2); StopWatch sw(env, db_options->statistics.get(), MANIFEST_FILE_SYNC_MICROS); return file->Sync(db_options->use_fsync); diff --git a/file/filename.h b/file/filename.h index 9a4be0a53..7dfe81d37 100644 --- a/file/filename.h +++ b/file/filename.h @@ -169,7 +169,7 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number, // Make the CURRENT file point to the descriptor file with the // specified number. -extern Status SetCurrentFile(Env* env, const std::string& dbname, +extern IOStatus SetCurrentFile(FileSystem* fs, const std::string& dbname, uint64_t descriptor_number, FSDirectory* directory_to_fsync); @@ -178,8 +178,8 @@ extern Status SetIdentityFile(Env* env, const std::string& dbname, const std::string& db_id = {}); // Sync manifest file `file`. -extern Status SyncManifest(Env* env, const ImmutableDBOptions* db_options, - WritableFileWriter* file); +extern IOStatus SyncManifest(Env* env, const ImmutableDBOptions* db_options, + WritableFileWriter* file); // Return list of file names of info logs in `file_names`. // The list only contains file name. The parent directory name is stored diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index d5894c17a..87eef04ec 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -21,10 +21,10 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -Status WritableFileWriter::Append(const Slice& data) { +IOStatus WritableFileWriter::Append(const Slice& data) { const char* src = data.data(); size_t left = data.size(); - Status s; + IOStatus s; pending_sync_ = true; TEST_KILL_RANDOM("WritableFileWriter::Append:0", @@ -94,7 +94,7 @@ Status WritableFileWriter::Append(const Slice& data) { return s; } -Status WritableFileWriter::Pad(const size_t pad_bytes) { +IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); @@ -107,7 +107,7 @@ Status WritableFileWriter::Pad(const size_t pad_bytes) { buf_.PadWith(append_bytes, 0); left -= append_bytes; if (left > 0) { - Status s = Flush(); + IOStatus s = Flush(); if (!s.ok()) { return s; } @@ -116,12 +116,12 @@ Status WritableFileWriter::Pad(const size_t pad_bytes) { } pending_sync_ = true; filesize_ += pad_bytes; - return Status::OK(); + return IOStatus::OK(); } -Status WritableFileWriter::Close() { +IOStatus WritableFileWriter::Close() { // Do not quit immediately on failure the file MUST be closed - Status s; + IOStatus s; // Possible to close it twice now as we MUST close // in __dtor, simply flushing is not enough @@ -133,7 +133,7 @@ Status WritableFileWriter::Close() { s = Flush(); // flush cache to OS - Status interim; + IOStatus interim; // In direct I/O mode we write whole pages so // we need to let the file know where data ends. if (use_direct_io()) { @@ -160,8 +160,8 @@ Status WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled -Status WritableFileWriter::Flush() { - Status s; +IOStatus WritableFileWriter::Flush() { + IOStatus s; TEST_KILL_RANDOM("WritableFileWriter::Flush:0", rocksdb_kill_odds * REDUCE_ODDS2); @@ -224,8 +224,8 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const { } } -Status WritableFileWriter::Sync(bool use_fsync) { - Status s = Flush(); +IOStatus WritableFileWriter::Sync(bool use_fsync) { + IOStatus s = Flush(); if (!s.ok()) { return s; } @@ -238,23 +238,23 @@ Status WritableFileWriter::Sync(bool use_fsync) { } TEST_KILL_RANDOM("WritableFileWriter::Sync:1", rocksdb_kill_odds); pending_sync_ = false; - return Status::OK(); + return IOStatus::OK(); } -Status WritableFileWriter::SyncWithoutFlush(bool use_fsync) { +IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { if (!writable_file_->IsSyncThreadSafe()) { - return Status::NotSupported( + return IOStatus::NotSupported( "Can't WritableFileWriter::SyncWithoutFlush() because " "WritableFile::IsSyncThreadSafe() is false"); } TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); - Status s = SyncInternal(use_fsync); + IOStatus s = SyncInternal(use_fsync); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); return s; } -Status WritableFileWriter::SyncInternal(bool use_fsync) { - Status s; +IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { + IOStatus s; IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); auto prev_perf_level = GetPerfLevel(); @@ -268,7 +268,7 @@ Status WritableFileWriter::SyncInternal(bool use_fsync) { return s; } -Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { +IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOSTATS_TIMER_GUARD(range_sync_nanos); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); return writable_file_->RangeSync(offset, nbytes, IOOptions(), nullptr); @@ -276,8 +276,8 @@ Status WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // This method writes to disk the specified data and makes use of the rate // limiter if available -Status WritableFileWriter::WriteBuffered(const char* data, size_t size) { - Status s; +IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { + IOStatus s; assert(!use_direct_io()); const char* src = data; size_t left = size; @@ -352,9 +352,9 @@ void WritableFileWriter::CalculateFileChecksum(const Slice& data) { // only write on aligned // offsets. #ifndef ROCKSDB_LITE -Status WritableFileWriter::WriteDirect() { +IOStatus WritableFileWriter::WriteDirect() { assert(use_direct_io()); - Status s; + IOStatus s; const size_t alignment = buf_.Alignment(); assert((next_write_offset_ % alignment) == 0); diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 123110713..1a76bc1f8 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -15,6 +15,7 @@ #include "rocksdb/env.h" #include "rocksdb/file_checksum.h" #include "rocksdb/file_system.h" +#include "rocksdb/io_status.h" #include "rocksdb/listener.h" #include "rocksdb/rate_limiter.h" #include "test_util/sync_point.h" @@ -36,11 +37,11 @@ class WritableFileWriter { void NotifyOnFileWriteFinish(uint64_t offset, size_t length, const FileOperationInfo::TimePoint& start_ts, const FileOperationInfo::TimePoint& finish_ts, - const Status& status) { + const IOStatus& io_status) { FileOperationInfo info(file_name_, start_ts, finish_ts); info.offset = offset; info.length = length; - info.status = status; + info.status = io_status; for (auto& listener : listeners_) { listener->OnFileWriteFinish(info); @@ -122,24 +123,24 @@ class WritableFileWriter { std::string file_name() const { return file_name_; } - Status Append(const Slice& data); + IOStatus Append(const Slice& data); - Status Pad(const size_t pad_bytes); + IOStatus Pad(const size_t pad_bytes); - Status Flush(); + IOStatus Flush(); - Status Close(); + IOStatus Close(); - Status Sync(bool use_fsync); + IOStatus Sync(bool use_fsync); // Sync only the data that was already Flush()ed. Safe to call concurrently // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), // returns NotSupported status. - Status SyncWithoutFlush(bool use_fsync); + IOStatus SyncWithoutFlush(bool use_fsync); uint64_t GetFileSize() const { return filesize_; } - Status InvalidateCache(size_t offset, size_t length) { + IOStatus InvalidateCache(size_t offset, size_t length) { return writable_file_->InvalidateCache(offset, length); } @@ -161,11 +162,11 @@ class WritableFileWriter { // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode #ifndef ROCKSDB_LITE - Status WriteDirect(); + IOStatus WriteDirect(); #endif // !ROCKSDB_LITE // Normal write - Status WriteBuffered(const char* data, size_t size); - Status RangeSync(uint64_t offset, uint64_t nbytes); - Status SyncInternal(bool use_fsync); + IOStatus WriteBuffered(const char* data, size_t size); + IOStatus RangeSync(uint64_t offset, uint64_t nbytes); + IOStatus SyncInternal(bool use_fsync); }; } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index 66b94680e..bcd0e4d11 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -1393,8 +1393,13 @@ class FSDirectoryWrapper : public FSDirectory { FSDirectory* target_; }; +// A utility routine: write "data" to the named file. +extern IOStatus WriteStringToFile(FileSystem* fs, const Slice& data, + const std::string& fname, + bool should_sync = false); + // A utility routine: read contents of named file into *data -extern Status ReadFileToString(FileSystem* fs, const std::string& fname, - std::string* data); +extern IOStatus ReadFileToString(FileSystem* fs, const std::string& fname, + std::string* data); } // namespace ROCKSDB_NAMESPACE diff --git a/include/rocksdb/io_status.h b/include/rocksdb/io_status.h index 6a6287240..6dd7be11f 100644 --- a/include/rocksdb/io_status.h +++ b/include/rocksdb/io_status.h @@ -227,4 +227,19 @@ inline bool IOStatus::operator!=(const IOStatus& rhs) const { return !(*this == rhs); } +inline IOStatus status_to_io_status(Status&& status) { + if (status.ok()) { + // Fast path + return IOStatus::OK(); + } else { + const char* state = status.getState(); + if (state) { + return IOStatus(status.code(), status.subcode(), + Slice(state, strlen(status.getState()) + 1), Slice()); + } else { + return IOStatus(status.code(), status.subcode()); + } + } +} + } // namespace ROCKSDB_NAMESPACE diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index a9529dd4e..9db331de9 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -284,6 +284,7 @@ struct BlockBasedTableBuilder::Rep { WritableFileWriter* file; uint64_t offset = 0; Status status; + IOStatus io_status; size_t alignment; BlockBuilder data_block; // Buffers uncompressed data blocks and keys to replay later. Needed when @@ -725,8 +726,9 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, handle->set_offset(r->offset); handle->set_size(block_contents.size()); assert(r->status.ok()); - r->status = r->file->Append(block_contents); - if (r->status.ok()) { + assert(r->io_status.ok()); + r->io_status = r->file->Append(block_contents); + if (r->io_status.ok()) { char trailer[kBlockTrailerSize]; trailer[0] = type; char* trailer_without_type = trailer + 1; @@ -765,32 +767,35 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, } } - assert(r->status.ok()); + assert(r->io_status.ok()); TEST_SYNC_POINT_CALLBACK( "BlockBasedTableBuilder::WriteRawBlock:TamperWithChecksum", static_cast(trailer)); - r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); - if (r->status.ok()) { + r->io_status = r->file->Append(Slice(trailer, kBlockTrailerSize)); + if (r->io_status.ok()) { r->status = InsertBlockInCache(block_contents, type, handle); } - if (r->status.ok()) { + if (r->status.ok() && r->io_status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; if (r->table_options.block_align && is_data_block) { size_t pad_bytes = (r->alignment - ((block_contents.size() + kBlockTrailerSize) & (r->alignment - 1))) & (r->alignment - 1); - r->status = r->file->Pad(pad_bytes); - if (r->status.ok()) { + r->io_status = r->file->Pad(pad_bytes); + if (r->io_status.ok()) { r->offset += pad_bytes; } } } } + r->status = r->io_status; } Status BlockBasedTableBuilder::status() const { return rep_->status; } +IOStatus BlockBasedTableBuilder::io_status() const { return rep_->io_status; } + // // Make a copy of the block contents and insert into compressed block cache // @@ -1050,10 +1055,12 @@ void BlockBasedTableBuilder::WriteFooter(BlockHandle& metaindex_block_handle, std::string footer_encoding; footer.EncodeTo(&footer_encoding); assert(r->status.ok()); - r->status = r->file->Append(footer_encoding); - if (r->status.ok()) { + assert(r->io_status.ok()); + r->io_status = r->file->Append(footer_encoding); + if (r->io_status.ok()) { r->offset += footer_encoding.size(); } + r->status = r->io_status; } void BlockBasedTableBuilder::EnterUnbuffered() { diff --git a/table/block_based/block_based_table_builder.h b/table/block_based/block_based_table_builder.h index 97c9bc65a..b85be99d8 100644 --- a/table/block_based/block_based_table_builder.h +++ b/table/block_based/block_based_table_builder.h @@ -68,6 +68,9 @@ class BlockBasedTableBuilder : public TableBuilder { // Return non-ok iff some error has been detected. Status status() const override; + // Return non-ok iff some error happens during IO. + IOStatus io_status() const override; + // Finish building the table. Stops using the file passed to the // constructor after this function returns. // REQUIRES: Finish(), Abandon() have not been called diff --git a/table/cuckoo/cuckoo_table_builder.cc b/table/cuckoo/cuckoo_table_builder.cc index 3ddce0b6f..7d3086a61 100644 --- a/table/cuckoo/cuckoo_table_builder.cc +++ b/table/cuckoo/cuckoo_table_builder.cc @@ -252,9 +252,9 @@ Status CuckooTableBuilder::Finish() { hash_table_size_ = static_cast(num_entries_ / max_hash_table_ratio_); } - s = MakeHashTable(&buckets); - if (!s.ok()) { - return s; + status_ = MakeHashTable(&buckets); + if (!status_.ok()) { + return status_; } // Determine unused_user_key to fill empty buckets. std::string unused_user_key = smallest_user_key_; @@ -301,18 +301,19 @@ Status CuckooTableBuilder::Finish() { uint32_t num_added = 0; for (auto& bucket : buckets) { if (bucket.vector_idx == kMaxVectorIdx) { - s = file_->Append(Slice(unused_bucket)); + io_status_ = file_->Append(Slice(unused_bucket)); } else { ++num_added; - s = file_->Append(GetKey(bucket.vector_idx)); - if (s.ok()) { + io_status_ = file_->Append(GetKey(bucket.vector_idx)); + if (io_status_.ok()) { if (value_size_ > 0) { - s = file_->Append(GetValue(bucket.vector_idx)); + io_status_ = file_->Append(GetValue(bucket.vector_idx)); } } } - if (!s.ok()) { - return s; + if (!io_status_.ok()) { + status_ = io_status_; + return status_; } } assert(num_added == NumEntries()); @@ -364,10 +365,11 @@ Status CuckooTableBuilder::Finish() { BlockHandle property_block_handle; property_block_handle.set_offset(offset); property_block_handle.set_size(property_block.size()); - s = file_->Append(property_block); + io_status_ = file_->Append(property_block); offset += property_block.size(); - if (!s.ok()) { - return s; + if (!io_status_.ok()) { + status_ = io_status_; + return status_; } meta_index_builder.Add(kPropertiesBlock, property_block_handle); @@ -376,9 +378,10 @@ Status CuckooTableBuilder::Finish() { BlockHandle meta_index_block_handle; meta_index_block_handle.set_offset(offset); meta_index_block_handle.set_size(meta_index_block.size()); - s = file_->Append(meta_index_block); - if (!s.ok()) { - return s; + io_status_ = file_->Append(meta_index_block); + if (!io_status_.ok()) { + status_ = io_status_; + return status_; } Footer footer(kCuckooTableMagicNumber, 1); @@ -386,12 +389,13 @@ Status CuckooTableBuilder::Finish() { footer.set_index_handle(BlockHandle::NullBlockHandle()); std::string footer_encoding; footer.EncodeTo(&footer_encoding); - s = file_->Append(footer_encoding); + io_status_ = file_->Append(footer_encoding); if (file_ != nullptr) { file_checksum_ = file_->GetFileChecksum(); } - return s; + status_ = io_status_; + return status_; } void CuckooTableBuilder::Abandon() { diff --git a/table/cuckoo/cuckoo_table_builder.h b/table/cuckoo/cuckoo_table_builder.h index d41dfed79..ab2ff3c5f 100644 --- a/table/cuckoo/cuckoo_table_builder.h +++ b/table/cuckoo/cuckoo_table_builder.h @@ -46,6 +46,9 @@ class CuckooTableBuilder: public TableBuilder { // Return non-ok iff some error has been detected. Status status() const override { return status_; } + // Return non-ok iff some error happens during IO. + IOStatus io_status() const override { return io_status_; } + // Finish building the table. Stops using the file passed to the // constructor after this function returns. // REQUIRES: Finish(), Abandon() have not been called @@ -116,6 +119,7 @@ class CuckooTableBuilder: public TableBuilder { // Number of keys that contain value (non-deletion op) uint64_t num_values_; Status status_; + IOStatus io_status_; TableProperties properties_; const Comparator* ucomp_; bool use_module_hash_; diff --git a/table/mock_table.h b/table/mock_table.h index 9e80c8d04..a99d6e4a9 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -15,6 +15,7 @@ #include "db/version_edit.h" #include "port/port.h" #include "rocksdb/comparator.h" +#include "rocksdb/io_status.h" #include "rocksdb/table.h" #include "table/internal_iterator.h" #include "table/table_builder.h" @@ -138,6 +139,9 @@ class MockTableBuilder : public TableBuilder { // Return non-ok iff some error has been detected. Status status() const override { return Status::OK(); } + // Return non-ok iff some error happens during IO. + IOStatus io_status() const override { return IOStatus::OK(); } + Status Finish() override { MutexLock lock_guard(&file_system_->mutex); file_system_->files.insert({id_, table_}); diff --git a/table/plain/plain_table_builder.cc b/table/plain/plain_table_builder.cc index 147e46db2..d2d6babf7 100644 --- a/table/plain/plain_table_builder.cc +++ b/table/plain/plain_table_builder.cc @@ -36,16 +36,16 @@ namespace { // a utility that helps writing block content to the file // @offset will advance if @block_contents was successfully written. // @block_handle the block handle this particular block. -Status WriteBlock(const Slice& block_contents, WritableFileWriter* file, - uint64_t* offset, BlockHandle* block_handle) { +IOStatus WriteBlock(const Slice& block_contents, WritableFileWriter* file, + uint64_t* offset, BlockHandle* block_handle) { block_handle->set_offset(*offset); block_handle->set_size(block_contents.size()); - Status s = file->Append(block_contents); + IOStatus io_s = file->Append(block_contents); - if (s.ok()) { + if (io_s.ok()) { *offset += block_contents.size(); } - return s; + return io_s; } } // namespace @@ -145,41 +145,46 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) { assert(offset_ <= std::numeric_limits::max()); auto prev_offset = static_cast(offset_); // Write out the key - encoder_.AppendKey(key, file_, &offset_, meta_bytes_buf, - &meta_bytes_buf_size); + io_status_ = encoder_.AppendKey(key, file_, &offset_, meta_bytes_buf, + &meta_bytes_buf_size); if (SaveIndexInFile()) { index_builder_->AddKeyPrefix(GetPrefix(internal_key), prev_offset); } // Write value length uint32_t value_size = static_cast(value.size()); - char* end_ptr = - EncodeVarint32(meta_bytes_buf + meta_bytes_buf_size, value_size); - assert(end_ptr <= meta_bytes_buf + sizeof(meta_bytes_buf)); - meta_bytes_buf_size = end_ptr - meta_bytes_buf; - file_->Append(Slice(meta_bytes_buf, meta_bytes_buf_size)); + if (io_status_.ok()) { + char* end_ptr = + EncodeVarint32(meta_bytes_buf + meta_bytes_buf_size, value_size); + assert(end_ptr <= meta_bytes_buf + sizeof(meta_bytes_buf)); + meta_bytes_buf_size = end_ptr - meta_bytes_buf; + io_status_ = file_->Append(Slice(meta_bytes_buf, meta_bytes_buf_size)); + } // Write value - file_->Append(value); - offset_ += value_size + meta_bytes_buf_size; - - properties_.num_entries++; - properties_.raw_key_size += key.size(); - properties_.raw_value_size += value.size(); - if (internal_key.type == kTypeDeletion || - internal_key.type == kTypeSingleDeletion) { - properties_.num_deletions++; - } else if (internal_key.type == kTypeMerge) { - properties_.num_merge_operands++; + if (io_status_.ok()) { + io_status_ = file_->Append(value); + offset_ += value_size + meta_bytes_buf_size; + } + + if (io_status_.ok()) { + properties_.num_entries++; + properties_.raw_key_size += key.size(); + properties_.raw_value_size += value.size(); + if (internal_key.type == kTypeDeletion || + internal_key.type == kTypeSingleDeletion) { + properties_.num_deletions++; + } else if (internal_key.type == kTypeMerge) { + properties_.num_merge_operands++; + } } // notify property collectors NotifyCollectTableCollectorsOnAdd( key, value, offset_, table_properties_collectors_, ioptions_.info_log); + status_ = io_status_; } -Status PlainTableBuilder::status() const { return status_; } - Status PlainTableBuilder::Finish() { assert(!closed_); closed_ = true; @@ -214,10 +219,12 @@ Status PlainTableBuilder::Finish() { Slice bloom_finish_result = bloom_block_.Finish(); properties_.filter_size = bloom_finish_result.size(); - s = WriteBlock(bloom_finish_result, file_, &offset_, &bloom_block_handle); + io_status_ = + WriteBlock(bloom_finish_result, file_, &offset_, &bloom_block_handle); - if (!s.ok()) { - return s; + if (!io_status_.ok()) { + status_ = io_status_; + return status_; } meta_index_builer.Add(BloomBlockBuilder::kBloomBlock, bloom_block_handle); } @@ -225,10 +232,12 @@ Status PlainTableBuilder::Finish() { Slice index_finish_result = index_builder_->Finish(); properties_.index_size = index_finish_result.size(); - s = WriteBlock(index_finish_result, file_, &offset_, &index_block_handle); + io_status_ = + WriteBlock(index_finish_result, file_, &offset_, &index_block_handle); - if (!s.ok()) { - return s; + if (!io_status_.ok()) { + status_ = io_status_; + return status_; } meta_index_builer.Add(PlainTableIndexBuilder::kPlainTableIndexBlock, @@ -249,27 +258,24 @@ Status PlainTableBuilder::Finish() { // -- Write property block BlockHandle property_block_handle; - auto s = WriteBlock( + IOStatus s = WriteBlock( property_block_builder.Finish(), file_, &offset_, &property_block_handle ); if (!s.ok()) { - return s; + return std::move(s); } meta_index_builer.Add(kPropertiesBlock, property_block_handle); // -- write metaindex block BlockHandle metaindex_block_handle; - s = WriteBlock( - meta_index_builer.Finish(), - file_, - &offset_, - &metaindex_block_handle - ); - if (!s.ok()) { - return s; + io_status_ = WriteBlock(meta_index_builer.Finish(), file_, &offset_, + &metaindex_block_handle); + if (!io_status_.ok()) { + status_ = io_status_; + return status_; } // Write Footer @@ -279,15 +285,16 @@ Status PlainTableBuilder::Finish() { footer.set_index_handle(BlockHandle::NullBlockHandle()); std::string footer_encoding; footer.EncodeTo(&footer_encoding); - s = file_->Append(footer_encoding); - if (s.ok()) { + io_status_ = file_->Append(footer_encoding); + if (io_status_.ok()) { offset_ += footer_encoding.size(); } if (file_ != nullptr) { file_checksum_ = file_->GetFileChecksum(); } - return s; + status_ = io_status_; + return status_; } void PlainTableBuilder::Abandon() { diff --git a/table/plain/plain_table_builder.h b/table/plain/plain_table_builder.h index fe2bf3cf9..d1d9e7785 100644 --- a/table/plain/plain_table_builder.h +++ b/table/plain/plain_table_builder.h @@ -59,7 +59,10 @@ class PlainTableBuilder: public TableBuilder { void Add(const Slice& key, const Slice& value) override; // Return non-ok iff some error has been detected. - Status status() const override; + Status status() const override { return status_; } + + // Return non-ok iff some error happens during IO. + IOStatus io_status() const override { return io_status_; } // Finish building the table. Stops using the file passed to the // constructor after this function returns. @@ -105,6 +108,7 @@ class PlainTableBuilder: public TableBuilder { uint32_t bloom_bits_per_key_; size_t huge_page_tlb_size_; Status status_; + IOStatus io_status_; TableProperties properties_; PlainTableKeyEncoder encoder_; diff --git a/table/plain/plain_table_key_coding.cc b/table/plain/plain_table_key_coding.cc index fa3fae1dc..c88a2dff6 100644 --- a/table/plain/plain_table_key_coding.cc +++ b/table/plain/plain_table_key_coding.cc @@ -80,13 +80,13 @@ inline Status PlainTableKeyDecoder::DecodeSize(uint32_t start_offset, } } -Status PlainTableKeyEncoder::AppendKey(const Slice& key, - WritableFileWriter* file, - uint64_t* offset, char* meta_bytes_buf, - size_t* meta_bytes_buf_size) { +IOStatus PlainTableKeyEncoder::AppendKey(const Slice& key, + WritableFileWriter* file, + uint64_t* offset, char* meta_bytes_buf, + size_t* meta_bytes_buf_size) { ParsedInternalKey parsed_key; if (!ParseInternalKey(key, &parsed_key)) { - return Status::Corruption(Slice()); + return IOStatus::Corruption(Slice()); } Slice key_to_write = key; // Portion of internal key to write out. @@ -99,9 +99,9 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key, char* ptr = EncodeVarint32(key_size_buf, user_key_size); assert(ptr <= key_size_buf + sizeof(key_size_buf)); auto len = ptr - key_size_buf; - Status s = file->Append(Slice(key_size_buf, len)); - if (!s.ok()) { - return s; + IOStatus io_s = file->Append(Slice(key_size_buf, len)); + if (!io_s.ok()) { + return io_s; } *offset += len; } @@ -117,9 +117,9 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key, key_count_for_prefix_ = 1; pre_prefix_.SetUserKey(prefix); size_bytes_pos += EncodeSize(kFullKey, user_key_size, size_bytes); - Status s = file->Append(Slice(size_bytes, size_bytes_pos)); - if (!s.ok()) { - return s; + IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos)); + if (!io_s.ok()) { + return io_s; } *offset += size_bytes_pos; } else { @@ -135,9 +135,9 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key, static_cast(pre_prefix_.GetUserKey().size()); size_bytes_pos += EncodeSize(kKeySuffix, user_key_size - prefix_len, size_bytes + size_bytes_pos); - Status s = file->Append(Slice(size_bytes, size_bytes_pos)); - if (!s.ok()) { - return s; + IOStatus io_s = file->Append(Slice(size_bytes, size_bytes_pos)); + if (!io_s.ok()) { + return io_s; } *offset += size_bytes_pos; key_to_write = Slice(key.data() + prefix_len, key.size() - prefix_len); @@ -149,20 +149,23 @@ Status PlainTableKeyEncoder::AppendKey(const Slice& key, // If the row is of value type with seqId 0, flush the special flag together // in this buffer to safe one file append call, which takes 1 byte. if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) { - Status s = + IOStatus io_s = file->Append(Slice(key_to_write.data(), key_to_write.size() - 8)); - if (!s.ok()) { - return s; + if (!io_s.ok()) { + return io_s; } *offset += key_to_write.size() - 8; meta_bytes_buf[*meta_bytes_buf_size] = PlainTableFactory::kValueTypeSeqId0; *meta_bytes_buf_size += 1; } else { - file->Append(key_to_write); + IOStatus io_s = file->Append(key_to_write); + if (!io_s.ok()) { + return io_s; + } *offset += key_to_write.size(); } - return Status::OK(); + return IOStatus::OK(); } Slice PlainTableFileReader::GetFromBuffer(Buffer* buffer, uint32_t file_offset, diff --git a/table/plain/plain_table_key_coding.h b/table/plain/plain_table_key_coding.h index d1460837d..2c345ad73 100644 --- a/table/plain/plain_table_key_coding.h +++ b/table/plain/plain_table_key_coding.h @@ -44,8 +44,9 @@ class PlainTableKeyEncoder { // meta_bytes_buf: buffer for extra meta bytes // meta_bytes_buf_size: offset to append extra meta bytes. Will be updated // if meta_bytes_buf is updated. - Status AppendKey(const Slice& key, WritableFileWriter* file, uint64_t* offset, - char* meta_bytes_buf, size_t* meta_bytes_buf_size); + IOStatus AppendKey(const Slice& key, WritableFileWriter* file, + uint64_t* offset, char* meta_bytes_buf, + size_t* meta_bytes_buf_size); // Return actual encoding type to be picked EncodingType GetEncodingType() { return encoding_type_; } diff --git a/table/table_builder.h b/table/table_builder.h index 541251073..3e2aacb86 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -136,6 +136,9 @@ class TableBuilder { // Return non-ok iff some error has been detected. virtual Status status() const = 0; + // Return non-ok iff some error happens during IO. + virtual IOStatus io_status() const = 0; + // Finish building the table. // REQUIRES: Finish(), Abandon() have not been called virtual Status Finish() = 0;