WritableFileWriter to allow operation after failure when SyncWithoutFlush() is involved (#10555)

Summary:
https://github.com/facebook/rocksdb/pull/10489 adds an assertion in most functions in WritableFileWriter to check no previous error. However, it only works without calling SyncWithoutFlush(). The nature of SyncWithoutFlush() makes two concurrent call fails to check status code of each other and causing assertion failure. Fix the problem by skipping the check after SyncWithoutFlush() is called and not check status code in SyncWithoutFlush().

Since the original change was not officially released yet, the fix isn't added to HISTORY.md.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10555

Test Plan: Make sure existing tests still pass

Reviewed By: anand1976

Differential Revision: D38946208

fbshipit-source-id: 63566732d3f25c8a8342840499cf7b7d745f27c2
main
sdong 2 years ago committed by Facebook GitHub Bot
parent 198e5d8ee9
commit 4915f89513
  1. 81
      file/writable_file_writer.cc
  2. 24
      file/writable_file_writer.h

@ -24,13 +24,6 @@
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
namespace {
IOStatus AssertFalseAndGetStatusForPrevError() {
assert(false);
return IOStatus::IOError("Writer has previous error.");
}
} // namespace
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname,
const FileOptions& file_opts,
@ -51,7 +44,7 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
@ -97,7 +90,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
if (buf_.CurrentSize() > 0) {
s = Flush(op_rate_limiter_priority);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -179,14 +172,14 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + data.size(), std::memory_order_release);
} else {
seen_error_ = true;
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
assert(pad_bytes < kDefaultPageSize);
@ -204,7 +197,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
if (left > 0) {
IOStatus s = Flush(op_rate_limiter_priority);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -222,7 +215,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
}
IOStatus WritableFileWriter::Close() {
if (seen_error_) {
if (seen_error()) {
IOStatus interim;
if (writable_file_.get() != nullptr) {
interim = writable_file_->Close(IOOptions(), nullptr);
@ -333,7 +326,7 @@ IOStatus WritableFileWriter::Close() {
checksum_finalized_ = true;
}
} else {
seen_error_ = true;
set_seen_error();
}
return s;
@ -342,7 +335,7 @@ IOStatus WritableFileWriter::Close() {
// write out the cached data to the OS cache or storage if direct I/O
// enabled
IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
@ -370,7 +363,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
}
}
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -399,7 +392,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
}
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
@ -427,7 +420,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
last_sync_size_ = offset_sync_to;
}
@ -455,20 +448,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const {
}
IOStatus WritableFileWriter::Sync(bool use_fsync) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
IOStatus s = Flush();
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
if (!use_direct_io() && pending_sync_) {
s = SyncInternal(use_fsync);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -478,10 +471,9 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) {
}
IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
if (!writable_file_->IsSyncThreadSafe()) {
return IOStatus::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because "
@ -491,16 +483,16 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
IOStatus s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
if (!s.ok()) {
seen_error_ = true;
#ifndef NDEBUG
sync_without_flush_called_ = true;
#endif // NDEBUG
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
// Caller is supposed to check seen_error_
IOStatus s;
IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
@ -536,14 +528,13 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
}
#endif
SetPerfLevel(prev_perf_level);
if (!s.ok()) {
seen_error_ = true;
}
// The caller will be responsible to call set_seen_error() if s is not OK.
return s;
}
IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
@ -559,7 +550,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
io_options.rate_limiter_priority = writable_file_->GetIOPriority();
IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
#ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) {
@ -578,7 +569,7 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
// limiter if available
IOStatus WritableFileWriter::WriteBuffered(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
@ -653,7 +644,7 @@ IOStatus WritableFileWriter::WriteBuffered(
}
#endif
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -669,14 +660,14 @@ IOStatus WritableFileWriter::WriteBuffered(
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::WriteBufferedWithChecksum(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
@ -751,7 +742,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
// and let caller determine error handling.
buf_.Size(0);
buffered_data_crc32c_checksum_ = 0;
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -766,7 +757,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release);
if (!s.ok()) {
seen_error_ = true;
set_seen_error();
}
return s;
}
@ -801,7 +792,7 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
#ifndef ROCKSDB_LITE
IOStatus WritableFileWriter::WriteDirect(
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
assert(false);
return IOStatus::IOError("Writer has previous error.");
@ -873,7 +864,7 @@ IOStatus WritableFileWriter::WriteDirect(
}
if (!s.ok()) {
buf_.Size(file_advance + leftover_tail);
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -898,14 +889,14 @@ IOStatus WritableFileWriter::WriteDirect(
// behind
next_write_offset_ += file_advance;
} else {
seen_error_ = true;
set_seen_error();
}
return s;
}
IOStatus WritableFileWriter::WriteDirectWithChecksum(
Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
if (seen_error()) {
return AssertFalseAndGetStatusForPrevError();
}
@ -986,7 +977,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
buf_.Size(file_advance + leftover_tail);
buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
seen_error_ = true;
set_seen_error();
return s;
}
}
@ -1011,7 +1002,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
// behind
next_write_offset_ += file_advance;
} else {
seen_error_ = true;
set_seen_error();
}
return s;
}

@ -151,7 +151,14 @@ class WritableFileWriter {
uint64_t next_write_offset_;
#endif // ROCKSDB_LITE
bool pending_sync_;
bool seen_error_;
std::atomic<bool> seen_error_;
#ifndef NDEBUG
// SyncWithoutFlush() is the function that is allowed to be called
// concurrently with other function. One of the concurrent call
// could set seen_error_, and the other one would hit assertion
// in debug mode.
std::atomic<bool> sync_without_flush_called_ = false;
#endif // NDEBUG
uint64_t last_sync_size_;
uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_;
@ -290,10 +297,21 @@ class WritableFileWriter {
const char* GetFileChecksumFuncName() const;
bool seen_error() const { return seen_error_; }
bool seen_error() const {
return seen_error_.load(std::memory_order_relaxed);
}
// For options of relaxed consistency, users might hope to continue
// operating on the file after an error happens.
void reset_seen_error() { seen_error_ = false; }
void reset_seen_error() {
seen_error_.store(false, std::memory_order_relaxed);
}
void set_seen_error() { seen_error_.store(true, std::memory_order_relaxed); }
IOStatus AssertFalseAndGetStatusForPrevError() {
// This should only happen if SyncWithoutFlush() was called.
assert(sync_without_flush_called_);
return IOStatus::IOError("Writer has previous error.");
}
private:
// Decide the Rate Limiter priority.

Loading…
Cancel
Save