diff --git a/HISTORY.md b/HISTORY.md index 5b6366e0d..842e048d0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -31,6 +31,7 @@ * PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env. * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly. * Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base. +* If an error is hit when writing to a file (append, sync, etc), RocksDB is more strict with not issuing more operations to it, except closing the file, with exceptions of some WAL file operations in error recovery path. ### Performance Improvements * Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables. diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc index 2dabc98e8..9dbac7f25 100644 --- a/db/blob/blob_log_writer.cc +++ b/db/blob/blob_log_writer.cc @@ -70,32 +70,38 @@ Status BlobLogWriter::AppendFooter(BlobLogFooter& footer, std::string str; footer.EncodeTo(&str); - Status s = dest_->Append(Slice(str)); - if (s.ok()) { - block_offset_ += str.size(); - - s = Sync(); - + Status s; + if (dest_->seen_error()) { + s.PermitUncheckedError(); + return Status::IOError("Seen Error. Skip closing."); + } else { + s = dest_->Append(Slice(str)); if (s.ok()) { - s = dest_->Close(); + block_offset_ += str.size(); + + s = Sync(); if (s.ok()) { - assert(!!checksum_method == !!checksum_value); + s = dest_->Close(); - if (checksum_method) { - assert(checksum_method->empty()); + if (s.ok()) { + assert(!!checksum_method == !!checksum_value); - std::string method = dest_->GetFileChecksumFuncName(); - if (method != kUnknownFileChecksumFuncName) { - *checksum_method = std::move(method); + if (checksum_method) { + assert(checksum_method->empty()); + + std::string method = dest_->GetFileChecksumFuncName(); + if (method != kUnknownFileChecksumFuncName) { + *checksum_method = std::move(method); + } } - } - if (checksum_value) { - assert(checksum_value->empty()); + if (checksum_value) { + assert(checksum_value->empty()); - std::string value = dest_->GetFileChecksum(); - if (value != kUnknownFileChecksum) { - *checksum_value = std::move(value); + std::string value = dest_->GetFileChecksum(); + if (value != kUnknownFileChecksum) { + *checksum_value = std::move(value); + } } } } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 49ce9d3a0..150e52338 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -109,12 +109,18 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, log->get_log_number()); + if (error_handler_.IsRecoveryInProgress()) { + log->file()->reset_seen_error(); + } io_s = log->file()->Sync(immutable_db_options_.use_fsync); if (!io_s.ok()) { break; } if (immutable_db_options_.recycle_log_file_num > 0) { + if (error_handler_.IsRecoveryInProgress()) { + log->file()->reset_seen_error(); + } io_s = log->Close(); if (!io_s.ok()) { break; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index baa2f7460..0ab334bb6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1089,6 +1089,9 @@ void DBImpl::IOStatusCheck(const IOStatus& io_status) { // Maybe change the return status to void? error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); + } else { + // Force writable file to be continue writable. + logs_.back().writer->file()->reset_seen_error(); } } @@ -2084,6 +2087,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one log::Writer* cur_log_writer = logs_.back().writer; + if (error_handler_.IsRecoveryInProgress()) { + // In recovery path, we force another try of writing WAL buffer. + cur_log_writer->file()->reset_seen_error(); + } io_s = cur_log_writer->WriteBuffer(); if (s.ok()) { s = io_s; diff --git a/db/log_writer.cc b/db/log_writer.cc index 807131cfc..faa9ad089 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -13,6 +13,7 @@ #include "file/writable_file_writer.h" #include "rocksdb/env.h" +#include "rocksdb/io_status.h" #include "util/coding.h" #include "util/crc32c.h" @@ -44,7 +45,12 @@ Writer::~Writer() { } } -IOStatus Writer::WriteBuffer() { return dest_->Flush(); } +IOStatus Writer::WriteBuffer() { + if (dest_->seen_error()) { + return IOStatus::IOError("Seen error. Skip writing buffer."); + } + return dest_->Flush(); +} IOStatus Writer::Close() { IOStatus s; diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 010136247..49b095937 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -281,7 +281,7 @@ void TestCustomizedTablePropertiesCollector( builder->Add(ikey.Encode(), kv.second); } ASSERT_OK(builder->Finish()); - writer->Flush(); + ASSERT_OK(writer->Flush()); // -- Step 2: Read properties test::StringSink* fwf = @@ -421,7 +421,7 @@ void TestInternalKeyPropertiesCollector( } ASSERT_OK(builder->Finish()); - writable->Flush(); + ASSERT_OK(writable->Flush()); test::StringSink* fwf = static_cast(writable->writable_file()); diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 410c2bc65..cedd6fe98 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -3226,7 +3226,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, InternalKey ikey(info.key, 0, ValueType::kTypeValue); builder->Add(ikey.Encode(), "value"); ASSERT_OK(builder->Finish()); - fwriter->Flush(); + ASSERT_OK(fwriter->Flush()); uint64_t file_size = 0; s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); ASSERT_OK(s); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 3b98ea658..7056d6a2f 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -16,6 +16,7 @@ #include "monitoring/histogram.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" +#include "rocksdb/io_status.h" #include "rocksdb/system_clock.h" #include "test_util/sync_point.h" #include "util/crc32c.h" @@ -23,6 +24,13 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { +namespace { +IOStatus AssertFalseAndGetStatusForPrevError() { + assert(false); + return IOStatus::IOError("Writer has previous error."); +} +} // namespace + IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, @@ -43,6 +51,10 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + const char* src = data.data(); size_t left = data.size(); IOStatus s; @@ -85,6 +97,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, if (buf_.CurrentSize() > 0) { s = Flush(op_rate_limiter_priority); if (!s.ok()) { + seen_error_ = true; return s; } } @@ -165,12 +178,17 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, if (s.ok()) { uint64_t cur_size = filesize_.load(std::memory_order_acquire); filesize_.store(cur_size + data.size(), std::memory_order_release); + } else { + seen_error_ = true; } return s; } IOStatus WritableFileWriter::Pad(const size_t pad_bytes, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); @@ -186,6 +204,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, if (left > 0) { IOStatus s = Flush(op_rate_limiter_priority); if (!s.ok()) { + seen_error_ = true; return s; } } @@ -203,17 +222,31 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, } IOStatus WritableFileWriter::Close() { + if (seen_error_) { + IOStatus interim; + if (writable_file_.get() != nullptr) { + interim = writable_file_->Close(IOOptions(), nullptr); + writable_file_.reset(); + } + if (interim.ok()) { + return IOStatus::IOError( + "File is closed but data not flushed as writer has previous error."); + } else { + return interim; + } + } + // Do not quit immediately on failure the file MUST be closed - IOStatus s; // Possible to close it twice now as we MUST close // in __dtor, simply flushing is not enough // Windows when pre-allocating does not fill with zeros // also with unbuffered access we also set the end of data. if (writable_file_.get() == nullptr) { - return s; + return IOStatus::OK(); } + IOStatus s; s = Flush(); // flush cache to OS IOStatus interim; @@ -294,9 +327,13 @@ IOStatus WritableFileWriter::Close() { writable_file_.reset(); TEST_KILL_RANDOM("WritableFileWriter::Close:1"); - if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) { - checksum_generator_->Finalize(); - checksum_finalized_ = true; + if (s.ok()) { + if (checksum_generator_ != nullptr && !checksum_finalized_) { + checksum_generator_->Finalize(); + checksum_finalized_ = true; + } + } else { + seen_error_ = true; } return s; @@ -305,6 +342,10 @@ IOStatus WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2); @@ -329,6 +370,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } } if (!s.ok()) { + seen_error_ = true; return s; } } @@ -357,6 +399,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } if (!s.ok()) { + seen_error_ = true; return s; } @@ -383,6 +426,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { if (offset_sync_to > 0 && offset_sync_to - last_sync_size_ >= bytes_per_sync_) { s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + if (!s.ok()) { + seen_error_ = true; + } last_sync_size_ = offset_sync_to; } } @@ -409,14 +455,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const { } IOStatus WritableFileWriter::Sync(bool use_fsync) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s = Flush(); if (!s.ok()) { + seen_error_ = true; return s; } TEST_KILL_RANDOM("WritableFileWriter::Sync:0"); if (!use_direct_io() && pending_sync_) { s = SyncInternal(use_fsync); if (!s.ok()) { + seen_error_ = true; return s; } } @@ -426,6 +478,10 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) { } IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + if (!writable_file_->IsSyncThreadSafe()) { return IOStatus::NotSupported( "Can't WritableFileWriter::SyncWithoutFlush() because " @@ -434,10 +490,17 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); IOStatus s = SyncInternal(use_fsync); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); + if (!s.ok()) { + seen_error_ = true; + } return s; } IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); @@ -473,10 +536,17 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { } #endif SetPerfLevel(prev_perf_level); + if (!s.ok()) { + seen_error_ = true; + } return s; } IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOSTATS_TIMER_GUARD(range_sync_nanos); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); #ifndef ROCKSDB_LITE @@ -488,6 +558,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOOptions io_options; io_options.rate_limiter_priority = writable_file_->GetIOPriority(); IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); + if (!s.ok()) { + seen_error_ = true; + } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -505,6 +578,10 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // limiter if available IOStatus WritableFileWriter::WriteBuffered( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; assert(!use_direct_io()); const char* src = data; @@ -576,6 +653,7 @@ IOStatus WritableFileWriter::WriteBuffered( } #endif if (!s.ok()) { + seen_error_ = true; return s; } } @@ -590,11 +668,18 @@ IOStatus WritableFileWriter::WriteBuffered( } buf_.Size(0); buffered_data_crc32c_checksum_ = 0; + if (!s.ok()) { + seen_error_ = true; + } return s; } IOStatus WritableFileWriter::WriteBufferedWithChecksum( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; assert(!use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); @@ -666,6 +751,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( // and let caller determine error handling. buf_.Size(0); buffered_data_crc32c_checksum_ = 0; + seen_error_ = true; return s; } } @@ -679,6 +765,9 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( buffered_data_crc32c_checksum_ = 0; uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); flushed_size_.store(cur_size + left, std::memory_order_release); + if (!s.ok()) { + seen_error_ = true; + } return s; } @@ -712,6 +801,12 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, #ifndef ROCKSDB_LITE IOStatus WritableFileWriter::WriteDirect( Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + assert(false); + + return IOStatus::IOError("Writer has previous error."); + } + assert(use_direct_io()); IOStatus s; const size_t alignment = buf_.Alignment(); @@ -778,6 +873,7 @@ IOStatus WritableFileWriter::WriteDirect( } if (!s.ok()) { buf_.Size(file_advance + leftover_tail); + seen_error_ = true; return s; } } @@ -801,12 +897,18 @@ IOStatus WritableFileWriter::WriteDirect( // is a multiple of whole pages otherwise filesize_ is leftover_tail // behind next_write_offset_ += file_advance; + } else { + seen_error_ = true; } return s; } IOStatus WritableFileWriter::WriteDirectWithChecksum( Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + assert(use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); IOStatus s; @@ -884,6 +986,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( buf_.Size(file_advance + leftover_tail); buffered_data_crc32c_checksum_ = crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); + seen_error_ = true; return s; } } @@ -907,6 +1010,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( // is a multiple of whole pages otherwise filesize_ is leftover_tail // behind next_write_offset_ += file_advance; + } else { + seen_error_ = true; } return s; } diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index bb9e5a6a1..8a0e52db4 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -151,6 +151,7 @@ class WritableFileWriter { uint64_t next_write_offset_; #endif // ROCKSDB_LITE bool pending_sync_; + bool seen_error_; uint64_t last_sync_size_; uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; @@ -186,6 +187,7 @@ class WritableFileWriter { next_write_offset_(0), #endif // ROCKSDB_LITE pending_sync_(false), + seen_error_(false), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter), @@ -288,6 +290,11 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; + bool seen_error() const { return seen_error_; } + // For options of relaxed consistency, users might hope to continue + // operating on the file after an error happens. + void reset_seen_error() { seen_error_ = false; } + private: // Decide the Rate Limiter priority. static Env::IOPriority DecideRateLimiterPriority( diff --git a/table/block_based/data_block_hash_index_test.cc b/table/block_based/data_block_hash_index_test.cc index 3f1aed7b5..f5fecfa23 100644 --- a/table/block_based/data_block_hash_index_test.cc +++ b/table/block_based/data_block_hash_index_test.cc @@ -567,7 +567,7 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, EXPECT_TRUE(builder->status().ok()); Status s = builder->Finish(); - file_writer->Flush(); + ASSERT_OK(file_writer->Flush()); EXPECT_TRUE(s.ok()) << s.ToString(); EXPECT_EQ(sink->contents().size(), builder->FileSize()); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index bf9c09244..6c4e2a9a0 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -895,12 +895,14 @@ TEST_F(DBWritableFileWriterTest, IOErrorNotification) { fwf->CheckCounters(1, 0); ASSERT_EQ(listener->NotifyErrorCount(), 1); + file_writer->reset_seen_error(); fwf->SetIOError(true); ASSERT_NOK(file_writer->Flush()); fwf->CheckCounters(1, 1); ASSERT_EQ(listener->NotifyErrorCount(), 2); /* No error generation */ + file_writer->reset_seen_error(); fwf->SetIOError(false); ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b'))); ASSERT_EQ(listener->NotifyErrorCount(), 2);