diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 7056d6a2f..3afc51c56 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -24,13 +24,6 @@ #include "util/rate_limiter.h" namespace ROCKSDB_NAMESPACE { -namespace { -IOStatus AssertFalseAndGetStatusForPrevError() { - assert(false); - return IOStatus::IOError("Writer has previous error."); -} -} // namespace - IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, const std::string& fname, const FileOptions& file_opts, @@ -51,7 +44,7 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr& fs, IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -97,7 +90,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, if (buf_.CurrentSize() > 0) { s = Flush(op_rate_limiter_priority); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -179,14 +172,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, uint64_t cur_size = filesize_.load(std::memory_order_acquire); filesize_.store(cur_size + data.size(), std::memory_order_release); } else { - seen_error_ = true; + set_seen_error(); } return s; } IOStatus WritableFileWriter::Pad(const size_t pad_bytes, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } assert(pad_bytes < kDefaultPageSize); @@ -204,7 +197,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, if (left > 0) { IOStatus s = Flush(op_rate_limiter_priority); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -222,7 +215,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes, } IOStatus WritableFileWriter::Close() { - if (seen_error_) { + if (seen_error()) { IOStatus interim; if (writable_file_.get() != nullptr) { interim = writable_file_->Close(IOOptions(), nullptr); @@ -333,7 +326,7 @@ IOStatus WritableFileWriter::Close() { checksum_finalized_ = true; } } else { - seen_error_ = true; + set_seen_error(); } return s; @@ -342,7 +335,7 @@ IOStatus WritableFileWriter::Close() { // write out the cached data to the OS cache or storage if direct I/O // enabled IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -370,7 +363,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } } if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -399,7 +392,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { } if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } @@ -427,7 +420,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { offset_sync_to - last_sync_size_ >= bytes_per_sync_) { s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } last_sync_size_ = offset_sync_to; } @@ -455,20 +448,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const { } IOStatus WritableFileWriter::Sync(bool use_fsync) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } IOStatus s = Flush(); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } TEST_KILL_RANDOM("WritableFileWriter::Sync:0"); if (!use_direct_io() && pending_sync_) { s = SyncInternal(use_fsync); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -478,10 +471,9 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) { } IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } - if (!writable_file_->IsSyncThreadSafe()) { return IOStatus::NotSupported( "Can't WritableFileWriter::SyncWithoutFlush() because " @@ -491,16 +483,16 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { IOStatus s = SyncInternal(use_fsync); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); if (!s.ok()) { - seen_error_ = true; +#ifndef NDEBUG + sync_without_flush_called_ = true; +#endif // NDEBUG + set_seen_error(); } return s; } IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { - if (seen_error_) { - return AssertFalseAndGetStatusForPrevError(); - } - + // Caller is supposed to check seen_error_ IOStatus s; IOSTATS_TIMER_GUARD(fsync_nanos); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); @@ -536,14 +528,13 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { } #endif SetPerfLevel(prev_perf_level); - if (!s.ok()) { - seen_error_ = true; - } + + // The caller will be responsible to call set_seen_error() if s is not OK. return s; } IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -559,7 +550,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { io_options.rate_limiter_priority = writable_file_->GetIOPriority(); IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } #ifndef ROCKSDB_LITE if (ShouldNotifyListeners()) { @@ -578,7 +569,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { // limiter if available IOStatus WritableFileWriter::WriteBuffered( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -653,7 +644,7 @@ IOStatus WritableFileWriter::WriteBuffered( } #endif if (!s.ok()) { - seen_error_ = true; + set_seen_error(); return s; } } @@ -669,14 +660,14 @@ IOStatus WritableFileWriter::WriteBuffered( buf_.Size(0); buffered_data_crc32c_checksum_ = 0; if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } return s; } IOStatus WritableFileWriter::WriteBufferedWithChecksum( const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -751,7 +742,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( // and let caller determine error handling. buf_.Size(0); buffered_data_crc32c_checksum_ = 0; - seen_error_ = true; + set_seen_error(); return s; } } @@ -766,7 +757,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum( uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); flushed_size_.store(cur_size + left, std::memory_order_release); if (!s.ok()) { - seen_error_ = true; + set_seen_error(); } return s; } @@ -801,7 +792,7 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data, #ifndef ROCKSDB_LITE IOStatus WritableFileWriter::WriteDirect( Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { assert(false); return IOStatus::IOError("Writer has previous error."); @@ -873,7 +864,7 @@ IOStatus WritableFileWriter::WriteDirect( } if (!s.ok()) { buf_.Size(file_advance + leftover_tail); - seen_error_ = true; + set_seen_error(); return s; } } @@ -898,14 +889,14 @@ IOStatus WritableFileWriter::WriteDirect( // behind next_write_offset_ += file_advance; } else { - seen_error_ = true; + set_seen_error(); } return s; } IOStatus WritableFileWriter::WriteDirectWithChecksum( Env::IOPriority op_rate_limiter_priority) { - if (seen_error_) { + if (seen_error()) { return AssertFalseAndGetStatusForPrevError(); } @@ -986,7 +977,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( buf_.Size(file_advance + leftover_tail); buffered_data_crc32c_checksum_ = crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); - seen_error_ = true; + set_seen_error(); return s; } } @@ -1011,7 +1002,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum( // behind next_write_offset_ += file_advance; } else { - seen_error_ = true; + set_seen_error(); } return s; } diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index 8a0e52db4..d513e2bc1 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -151,7 +151,14 @@ class WritableFileWriter { uint64_t next_write_offset_; #endif // ROCKSDB_LITE bool pending_sync_; - bool seen_error_; + std::atomic seen_error_; +#ifndef NDEBUG + // SyncWithoutFlush() is the function that is allowed to be called + // concurrently with other function. One of the concurrent call + // could set seen_error_, and the other one would hit assertion + // in debug mode. + std::atomic sync_without_flush_called_ = false; +#endif // NDEBUG uint64_t last_sync_size_; uint64_t bytes_per_sync_; RateLimiter* rate_limiter_; @@ -290,10 +297,21 @@ class WritableFileWriter { const char* GetFileChecksumFuncName() const; - bool seen_error() const { return seen_error_; } + bool seen_error() const { + return seen_error_.load(std::memory_order_relaxed); + } // For options of relaxed consistency, users might hope to continue // operating on the file after an error happens. - void reset_seen_error() { seen_error_ = false; } + void reset_seen_error() { + seen_error_.store(false, std::memory_order_relaxed); + } + void set_seen_error() { seen_error_.store(true, std::memory_order_relaxed); } + + IOStatus AssertFalseAndGetStatusForPrevError() { + // This should only happen if SyncWithoutFlush() was called. + assert(sync_without_flush_called_); + return IOStatus::IOError("Writer has previous error."); + } private: // Decide the Rate Limiter priority.