From 911c0208b917d6e2389f48ad40ec7df58b2c016b Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 10 Aug 2022 10:19:20 -0700 Subject: [PATCH] WritableFileWriter tries to skip operations after failure (#10489) Summary: A flag in WritableFileWriter is introduced to remember error has happened. Subsequent operations will fail with an assertion. Those operations, except Close() are not supposed to be called anyway. This change will help catch bug in tests and stress tests and limit damage of a potential bug of continue writing to a file after a failure. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10489 Test Plan: Fix existing unit tests and watch crash tests for a while. Reviewed By: anand1976 Differential Revision: D38473277 fbshipit-source-id: 09aafb971e56cfd7f9ef92ad15b883f54acf1366 --- HISTORY.md | 1 + db/blob/blob_log_writer.cc | 44 ++++--- db/db_impl/db_impl_compaction_flush.cc | 6 + db/db_impl/db_impl_write.cc | 7 ++ db/log_writer.cc | 8 +- db/table_properties_collector_test.cc | 4 +- db/version_set_test.cc | 2 +- file/writable_file_writer.cc | 115 +++++++++++++++++- file/writable_file_writer.h | 7 ++ .../block_based/data_block_hash_index_test.cc | 2 +- util/file_reader_writer_test.cc | 2 + 11 files changed, 169 insertions(+), 29 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5b6366e0d..842e048d0 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -31,6 +31,7 @@ * PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env. * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly. * Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base. +* If an error is hit when writing to a file (append, sync, etc), RocksDB is more strict with not issuing more operations to it, except closing the file, with exceptions of some WAL file operations in error recovery path. ### Performance Improvements * Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables. diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc index 2dabc98e8..9dbac7f25 100644 --- a/db/blob/blob_log_writer.cc +++ b/db/blob/blob_log_writer.cc @@ -70,32 +70,38 @@ Status BlobLogWriter::AppendFooter(BlobLogFooter& footer, std::string str; footer.EncodeTo(&str); - Status s = dest_->Append(Slice(str)); - if (s.ok()) { - block_offset_ += str.size(); - - s = Sync(); - + Status s; + if (dest_->seen_error()) { + s.PermitUncheckedError(); + return Status::IOError("Seen Error. Skip closing."); + } else { + s = dest_->Append(Slice(str)); if (s.ok()) { - s = dest_->Close(); + block_offset_ += str.size(); + + s = Sync(); if (s.ok()) { - assert(!!checksum_method == !!checksum_value); + s = dest_->Close(); - if (checksum_method) { - assert(checksum_method->empty()); + if (s.ok()) { + assert(!!checksum_method == !!checksum_value); - std::string method = dest_->GetFileChecksumFuncName(); - if (method != kUnknownFileChecksumFuncName) { - *checksum_method = std::move(method); + if (checksum_method) { + assert(checksum_method->empty()); + + std::string method = dest_->GetFileChecksumFuncName(); + if (method != kUnknownFileChecksumFuncName) { + *checksum_method = std::move(method); + } } - } - if (checksum_value) { - assert(checksum_value->empty()); + if (checksum_value) { + assert(checksum_value->empty()); - std::string value = dest_->GetFileChecksum(); - if (value != kUnknownFileChecksum) { - *checksum_value = std::move(value); + std::string value = dest_->GetFileChecksum(); + if (value != kUnknownFileChecksum) { + *checksum_value = std::move(value); + } } } } diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 49ce9d3a0..150e52338 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -109,12 +109,18 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context, ROCKS_LOG_INFO(immutable_db_options_.info_log, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, log->get_log_number()); + if (error_handler_.IsRecoveryInProgress()) { + log->file()->reset_seen_error(); + } io_s = log->file()->Sync(immutable_db_options_.use_fsync); if (!io_s.ok()) { break; } if (immutable_db_options_.recycle_log_file_num > 0) { + if (error_handler_.IsRecoveryInProgress()) { + log->file()->reset_seen_error(); + } io_s = log->Close(); if (!io_s.ok()) { break; diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index baa2f7460..0ab334bb6 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1089,6 +1089,9 @@ void DBImpl::IOStatusCheck(const IOStatus& io_status) { // Maybe change the return status to void? error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); mutex_.Unlock(); + } else { + // Force writable file to be continue writable. + logs_.back().writer->file()->reset_seen_error(); } } @@ -2084,6 +2087,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { if (!logs_.empty()) { // Alway flush the buffer of the last log before switching to a new one log::Writer* cur_log_writer = logs_.back().writer; + if (error_handler_.IsRecoveryInProgress()) { + // In recovery path, we force another try of writing WAL buffer. + cur_log_writer->file()->reset_seen_error(); + } io_s = cur_log_writer->WriteBuffer(); if (s.ok()) { s = io_s; diff --git a/db/log_writer.cc b/db/log_writer.cc index 807131cfc..faa9ad089 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -13,6 +13,7 @@ #include "file/writable_file_writer.h" #include "rocksdb/env.h" +#include "rocksdb/io_status.h" #include "util/coding.h" #include "util/crc32c.h" @@ -44,7 +45,12 @@ Writer::~Writer() { } } -IOStatus Writer::WriteBuffer() { return dest_->Flush(); } +IOStatus Writer::WriteBuffer() { + if (dest_->seen_error()) { + return IOStatus::IOError("Seen error. Skip writing buffer."); + } + return dest_->Flush(); +} IOStatus Writer::Close() { IOStatus s; diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 010136247..49b095937 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -281,7 +281,7 @@ void TestCustomizedTablePropertiesCollector( builder->Add(ikey.Encode(), kv.second); } ASSERT_OK(builder->Finish()); - writer->Flush(); + ASSERT_OK(writer->Flush()); // -- Step 2: Read properties test::StringSink* fwf = @@ -421,7 +421,7 @@ void TestInternalKeyPropertiesCollector( } ASSERT_OK(builder->Finish()); - writable->Flush(); + ASSERT_OK(writable->Flush()); test::StringSink* fwf = static_cast(writable->writable_file()); diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 410c2bc65..cedd6fe98 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -3226,7 +3226,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase, InternalKey ikey(info.key, 0, ValueType::kTypeValue); builder->Add(ikey.Encode(), "value"); ASSERT_OK(builder->Finish()); - fwriter->Flush(); + ASSERT_OK(fwriter->Flush()); uint64_t file_size = 0; s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); ASSERT_OK(s); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 3b98ea658..7056d6a2f 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -16,6 +16,7 @@ #include "monitoring/histogram.h" #include "monitoring/iostats_context_imp.h" #include "port/port.h" +#include "rocksdb/io_status.h" #include "rocksdb/system_clock.h" #include "test_util/sync_point.h" #include "util/crc32c.h" @@ -23,6 +24,13 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { +namespace { +IOStatus AssertFalseAndGetStatusForPrevError() { + assert(false); + return IOStatus::IOError("Writer has previous error."); +} +} // namespace + IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, @@ -43,6 +51,10 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + const char* src = data.data(); size_t left = data.size(); IOStatus s; @@ -85,6 +97,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, if (buf_.CurrentSize() > 0) { s = Flush(op_rate_limiter_priority); if (!s.ok()) { + seen_error_ = true; return s; } } @@ -165,12 +178,17 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, if (s.ok()) { uint64_t cur_size = filesize_.load(std::memory_order_acquire); filesize_.store(cur_size + data.size(), std::memory_order_release); + } else { + seen_error_ = true; } return s; } IOStatus WritableFileWriter::Pad(const size_t pad_bytes, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); @@ -186,6 +204,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, if (left > 0) { IOStatus s = Flush(op_rate_limiter_priority); if (!s.ok()) { + seen_error_ = true; return s; } } @@ -203,17 +222,31 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, } IOStatus WritableFileWriter::Close() { + if (seen_error_) { + IOStatus interim; + if (writable_file_.get() != nullptr) { + interim = writable_file_->Close(IOOptions(), nullptr); + writable_file_.reset(); + } + if (interim.ok()) { + return IOStatus::IOError( + "File is closed but data not flushed as writer has previous error."); + } else { + return interim; + } + } + // Do not quit immediately on failure the file MUST be closed - IOStatus s; // Possible to close it twice now as we MUST close // in __dtor, simply flushing is not enough // Windows when pre-allocating does not fill with zeros // also with unbuffered access we also set the end of data. if (writable_file_.get() == nullptr) { - return s; + return IOStatus::OK(); } + IOStatus s; s = Flush(); // flush cache to OS IOStatus interim; @@ -294,9 +327,13 @@ IOStatus WritableFileWriter::Close() { writable_file_.reset(); TEST_KILL_RANDOM("WritableFileWriter::Close:1"); - if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) { - checksum_generator_->Finalize(); - checksum_finalized_ = true; + if (s.ok()) { + if (checksum_generator_ != nullptr && !checksum_finalized_) { + checksum_generator_->Finalize(); + checksum_finalized_ = true; + } + } else { + seen_error_ = true; } return s; @@ -305,6 +342,10 @@ IOStatus WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2); @@ -329,6 +370,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } } if (!s.ok()) { + seen_error_ = true; return s; } } @@ -357,6 +399,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } if (!s.ok()) { + seen_error_ = true; return s; } @@ -383,6 +426,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { if (offset_sync_to > 0 && offset_sync_to - last_sync_size_ >= bytes_per_sync_) { s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); + if (!s.ok()) { + seen_error_ = true; + } last_sync_size_ = offset_sync_to; } } @@ -409,14 +455,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const { } IOStatus WritableFileWriter::Sync(bool use_fsync) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s = Flush(); if (!s.ok()) { + seen_error_ = true; return s; } TEST_KILL_RANDOM("WritableFileWriter::Sync:0"); if (!use_direct_io() && pending_sync_) { s = SyncInternal(use_fsync); if (!s.ok()) { + seen_error_ = true; return s; } } @@ -426,6 +478,10 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) { } IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + if (!writable_file_->IsSyncThreadSafe()) { return IOStatus::NotSupported( "Can't WritableFileWriter::SyncWithoutFlush() because " @@ -434,10 +490,17 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); IOStatus s = SyncInternal(use_fsync); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); + if (!s.ok()) { + seen_error_ = true; + } return s; } IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); @@ -473,10 +536,17 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { } #endif SetPerfLevel(prev_perf_level); + if (!s.ok()) { + seen_error_ = true; + } return s; } IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOSTATS_TIMER_GUARD(range_sync_nanos); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); #ifndef ROCKSDB_LITE @@ -488,6 +558,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOOptions io_options; io_options.rate_limiter_priority = writable_file_->GetIOPriority(); IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); + if (!s.ok()) { + seen_error_ = true; + } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { auto finish_ts = std::chrono::steady_clock::now(); @@ -505,6 +578,10 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // limiter if available IOStatus WritableFileWriter::WriteBuffered( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; assert(!use_direct_io()); const char* src = data; @@ -576,6 +653,7 @@ IOStatus WritableFileWriter::WriteBuffered( } #endif if (!s.ok()) { + seen_error_ = true; return s; } } @@ -590,11 +668,18 @@ IOStatus WritableFileWriter::WriteBuffered( } buf_.Size(0); buffered_data_crc32c_checksum_ = 0; + if (!s.ok()) { + seen_error_ = true; + } return s; } IOStatus WritableFileWriter::WriteBufferedWithChecksum( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + IOStatus s; assert(!use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); @@ -666,6 +751,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( // and let caller determine error handling. buf_.Size(0); buffered_data_crc32c_checksum_ = 0; + seen_error_ = true; return s; } } @@ -679,6 +765,9 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( buffered_data_crc32c_checksum_ = 0; uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); flushed_size_.store(cur_size + left, std::memory_order_release); + if (!s.ok()) { + seen_error_ = true; + } return s; } @@ -712,6 +801,12 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, #ifndef ROCKSDB_LITE IOStatus WritableFileWriter::WriteDirect( Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + assert(false); + + return IOStatus::IOError("Writer has previous error."); + } + assert(use_direct_io()); IOStatus s; const size_t alignment = buf_.Alignment(); @@ -778,6 +873,7 @@ IOStatus WritableFileWriter::WriteDirect( } if (!s.ok()) { buf_.Size(file_advance + leftover_tail); + seen_error_ = true; return s; } } @@ -801,12 +897,18 @@ IOStatus WritableFileWriter::WriteDirect( // is a multiple of whole pages otherwise filesize_ is leftover_tail // behind next_write_offset_ += file_advance; + } else { + seen_error_ = true; } return s; } IOStatus WritableFileWriter::WriteDirectWithChecksum( Env::IOPriority op_rate_limiter_priority) { + if (seen_error_) { + return AssertFalseAndGetStatusForPrevError(); + } + assert(use_direct_io()); assert(perform_data_verification_ && buffered_data_with_checksum_); IOStatus s; @@ -884,6 +986,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( buf_.Size(file_advance + leftover_tail); buffered_data_crc32c_checksum_ = crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); + seen_error_ = true; return s; } } @@ -907,6 +1010,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( // is a multiple of whole pages otherwise filesize_ is leftover_tail // behind next_write_offset_ += file_advance; + } else { + seen_error_ = true; } return s; } diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index bb9e5a6a1..8a0e52db4 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -151,6 +151,7 @@ class WritableFileWriter { uint64_t next_write_offset_; #endif // ROCKSDB_LITE bool pending_sync_; + bool seen_error_; uint64_t last_sync_size_; uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; @@ -186,6 +187,7 @@ class WritableFileWriter { next_write_offset_(0), #endif // ROCKSDB_LITE pending_sync_(false), + seen_error_(false), last_sync_size_(0), bytes_per_sync_(options.bytes_per_sync), rate_limiter_(options.rate_limiter), @@ -288,6 +290,11 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; + bool seen_error() const { return seen_error_; } + // For options of relaxed consistency, users might hope to continue + // operating on the file after an error happens. + void reset_seen_error() { seen_error_ = false; } + private: // Decide the Rate Limiter priority. static Env::IOPriority DecideRateLimiterPriority( diff --git a/table/block_based/data_block_hash_index_test.cc b/table/block_based/data_block_hash_index_test.cc index 3f1aed7b5..f5fecfa23 100644 --- a/table/block_based/data_block_hash_index_test.cc +++ b/table/block_based/data_block_hash_index_test.cc @@ -567,7 +567,7 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2, EXPECT_TRUE(builder->status().ok()); Status s = builder->Finish(); - file_writer->Flush(); + ASSERT_OK(file_writer->Flush()); EXPECT_TRUE(s.ok()) << s.ToString(); EXPECT_EQ(sink->contents().size(), builder->FileSize()); diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index bf9c09244..6c4e2a9a0 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -895,12 +895,14 @@ TEST_F(DBWritableFileWriterTest, IOErrorNotification) { fwf->CheckCounters(1, 0); ASSERT_EQ(listener->NotifyErrorCount(), 1); + file_writer->reset_seen_error(); fwf->SetIOError(true); ASSERT_NOK(file_writer->Flush()); fwf->CheckCounters(1, 1); ASSERT_EQ(listener->NotifyErrorCount(), 2); /* No error generation */ + file_writer->reset_seen_error(); fwf->SetIOError(false); ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b'))); ASSERT_EQ(listener->NotifyErrorCount(), 2);