From 4915f895138ba6e8730df58ac3f7258b29cd1452 Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 24 Aug 2022 07:30:52 -0700 Subject: [PATCH] WritableFileWriter to allow operation after failure when SyncWithoutFlush() is involved (#10555) Summary: https://github.com/facebook/rocksdb/pull/10489 adds an assertion in most functions in WritableFileWriter to check no previous error. However, it only works without calling SyncWithoutFlush(). The nature of SyncWithoutFlush() makes two concurrent call fails to check status code of each other and causing assertion failure. Fix the problem by skipping the check after SyncWithoutFlush() is called and not check status code in SyncWithoutFlush(). Since the original change was not officially released yet, the fix isn't added to HISTORY.md. Pull Request resolved: https://github.com/facebook/rocksdb/pull/10555 Test Plan: Make sure existing tests still pass Reviewed By: anand1976 Differential Revision: D38946208 fbshipit-source-id: 63566732d3f25c8a8342840499cf7b7d745f27c2 --- file/writable_file_writer.cc | 81 ++++++++++++++++-------------------- file/writable_file_writer.h | 24 +++++++++-- 2 files changed, 57 insertions(+), 48 deletions(-) diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 7056d6a2f..3afc51c56 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -24,13 +24,6 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -namespace { -IOStatus AssertFalseAndGetStatusForPrevError() { - assert(false); - return IOStatus::IOError("Writer has previous error."); -} -} // namespace - IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, @@ -51,7 +44,7 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -97,7 +90,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, if (buf_.CurrentSize() > 0) { s = Flush(op_rate_limiter_priority); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -179,14 +172,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, uint64_t cur_size = filesize_.load(std::memory_order_acquire); filesize_.store(cur_size + data.size(), std::memory_order_release); } else { - seen_error_ = true; + set_seen_error(); } return s; } IOStatus WritableFileWriter::Pad(const size_t pad_bytes, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } assert(pad_bytes < kDefaultPageSize); @@ -204,7 +197,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, if (left > 0) { IOStatus s = Flush(op_rate_limiter_priority); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -222,7 +215,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, } IOStatus WritableFileWriter::Close() { - if (seen_error_) { + if (seen_error()) { IOStatus interim; if (writable_file_.get() != nullptr) { interim = writable_file_->Close(IOOptions(), nullptr); @@ -333,7 +326,7 @@ IOStatus WritableFileWriter::Close() { checksum_finalized_ = true; } } else { - seen_error_ = true; + set_seen_error(); } return s; @@ -342,7 +335,7 @@ IOStatus WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -370,7 +363,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } } if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -399,7 +392,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } @@ -427,7 +420,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { offset_sync_to - last_sync_size_ >= bytes_per_sync_) { s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } last_sync_size_ = offset_sync_to; } @@ -455,20 +448,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const { } IOStatus WritableFileWriter::Sync(bool use_fsync) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } IOStatus s = Flush(); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } TEST_KILL_RANDOM("WritableFileWriter::Sync:0"); if (!use_direct_io() && pending_sync_) { s = SyncInternal(use_fsync); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -478,10 +471,9 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) { } IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } - if (!writable_file_->IsSyncThreadSafe()) { return IOStatus::NotSupported( "Can't WritableFileWriter::SyncWithoutFlush() because " @@ -491,16 +483,16 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { IOStatus s = SyncInternal(use_fsync); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); if (!s.ok()) { - seen_error_ = true; +#ifndef NDEBUG + sync_without_flush_called_ = true; +#endif // NDEBUG + set_seen_error(); } return s; } IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { - if (seen_error_) { - return AssertFalseAndGetStatusForPrevError(); - } - + // Caller is supposed to check seen_error_ IOStatus s; IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); @@ -536,14 +528,13 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { } #endif SetPerfLevel(prev_perf_level); - if (!s.ok()) { - seen_error_ = true; - } + + // The caller will be responsible to call set_seen_error() if s is not OK. return s; } IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -559,7 +550,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { io_options.rate_limiter_priority = writable_file_->GetIOPriority(); IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -578,7 +569,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // limiter if available IOStatus WritableFileWriter::WriteBuffered( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -653,7 +644,7 @@ IOStatus WritableFileWriter::WriteBuffered( } #endif if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -669,14 +660,14 @@ IOStatus WritableFileWriter::WriteBuffered( buf_.Size(0); buffered_data_crc32c_checksum_ = 0; if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } return s; } IOStatus WritableFileWriter::WriteBufferedWithChecksum( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -751,7 +742,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( // and let caller determine error handling. buf_.Size(0); buffered_data_crc32c_checksum_ = 0; - seen_error_ = true; + set_seen_error(); return s; } } @@ -766,7 +757,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); flushed_size_.store(cur_size + left, std::memory_order_release); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } return s; } @@ -801,7 +792,7 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, #ifndef ROCKSDB_LITE IOStatus WritableFileWriter::WriteDirect( Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { assert(false); return IOStatus::IOError("Writer has previous error."); @@ -873,7 +864,7 @@ IOStatus WritableFileWriter::WriteDirect( } if (!s.ok()) { buf_.Size(file_advance + leftover_tail); - seen_error_ = true; + set_seen_error(); return s; } } @@ -898,14 +889,14 @@ IOStatus WritableFileWriter::WriteDirect( // behind next_write_offset_ += file_advance; } else { - seen_error_ = true; + set_seen_error(); } return s; } IOStatus WritableFileWriter::WriteDirectWithChecksum( Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -986,7 +977,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( buf_.Size(file_advance + leftover_tail); buffered_data_crc32c_checksum_ = crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); - seen_error_ = true; + set_seen_error(); return s; } } @@ -1011,7 +1002,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( // behind next_write_offset_ += file_advance; } else { - seen_error_ = true; + set_seen_error(); } return s; } diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 8a0e52db4..d513e2bc1 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -151,7 +151,14 @@ class WritableFileWriter { uint64_t next_write_offset_; #endif // ROCKSDB_LITE bool pending_sync_; - bool seen_error_; + std::atomic seen_error_; +#ifndef NDEBUG + // SyncWithoutFlush() is the function that is allowed to be called + // concurrently with other function. One of the concurrent call + // could set seen_error_, and the other one would hit assertion + // in debug mode. + std::atomic sync_without_flush_called_ = false; +#endif // NDEBUG uint64_t last_sync_size_; uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; @@ -290,10 +297,21 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; - bool seen_error() const { return seen_error_; } + bool seen_error() const { + return seen_error_.load(std::memory_order_relaxed); + } // For options of relaxed consistency, users might hope to continue // operating on the file after an error happens. - void reset_seen_error() { seen_error_ = false; } + void reset_seen_error() { + seen_error_.store(false, std::memory_order_relaxed); + } + void set_seen_error() { seen_error_.store(true, std::memory_order_relaxed); } + + IOStatus AssertFalseAndGetStatusForPrevError() { + // This should only happen if SyncWithoutFlush() was called. + assert(sync_without_flush_called_); + return IOStatus::IOError("Writer has previous error."); + } private: // Decide the Rate Limiter priority.