WritableFileWriter tries to skip operations after failure (#10489)

Summary:
A flag in WritableFileWriter is introduced to remember error has happened. Subsequent operations will fail with an assertion. Those operations, except Close() are not supposed to be called anyway. This change will help catch bug in tests and stress tests and limit damage of a potential bug of continue writing to a file after a failure.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10489

Test Plan: Fix existing unit tests and watch crash tests for a while.

Reviewed By: anand1976

Differential Revision: D38473277

fbshipit-source-id: 09aafb971e56cfd7f9ef92ad15b883f54acf1366
main
sdong 2 years ago committed by Facebook GitHub Bot
parent b57155a0bd
commit 911c0208b9
  1. 1
      HISTORY.md
  2. 44
      db/blob/blob_log_writer.cc
  3. 6
      db/db_impl/db_impl_compaction_flush.cc
  4. 7
      db/db_impl/db_impl_write.cc
  5. 8
      db/log_writer.cc
  6. 4
      db/table_properties_collector_test.cc
  7. 2
      db/version_set_test.cc
  8. 115
      file/writable_file_writer.cc
  9. 7
      file/writable_file_writer.h
  10. 2
      table/block_based/data_block_hash_index_test.cc
  11. 2
      util/file_reader_writer_test.cc

@ -31,6 +31,7 @@
* PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env. * PosixLogger is removed and by default EnvLogger will be used for info logging. The behavior of the two loggers should be very similar when using the default Posix Env.
* Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly. * Remove [min|max]_timestamp from VersionEdit for now since they are not tracked in MANIFEST anyway but consume two empty std::string (up to 64 bytes) for each file. Should they be added back in the future, we should store them more compactly.
* Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base. * Improve universal tiered storage compaction picker to avoid extra major compaction triggered by size amplification. If `preclude_last_level_data_seconds` is enabled, the size amplification is calculated within non last_level data only which skip the last level and use the penultimate level as the size base.
* If an error is hit when writing to a file (append, sync, etc), RocksDB is more strict with not issuing more operations to it, except closing the file, with exceptions of some WAL file operations in error recovery path.
### Performance Improvements ### Performance Improvements
* Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables. * Instead of constructing `FragmentedRangeTombstoneList` during every read operation, it is now constructed once and stored in immutable memtables. This improves speed of querying range tombstones from immutable memtables.

@ -70,32 +70,38 @@ Status BlobLogWriter::AppendFooter(BlobLogFooter& footer,
std::string str; std::string str;
footer.EncodeTo(&str); footer.EncodeTo(&str);
Status s = dest_->Append(Slice(str)); Status s;
if (s.ok()) { if (dest_->seen_error()) {
block_offset_ += str.size(); s.PermitUncheckedError();
return Status::IOError("Seen Error. Skip closing.");
s = Sync(); } else {
s = dest_->Append(Slice(str));
if (s.ok()) { if (s.ok()) {
s = dest_->Close(); block_offset_ += str.size();
s = Sync();
if (s.ok()) { if (s.ok()) {
assert(!!checksum_method == !!checksum_value); s = dest_->Close();
if (checksum_method) { if (s.ok()) {
assert(checksum_method->empty()); assert(!!checksum_method == !!checksum_value);
std::string method = dest_->GetFileChecksumFuncName(); if (checksum_method) {
if (method != kUnknownFileChecksumFuncName) { assert(checksum_method->empty());
*checksum_method = std::move(method);
std::string method = dest_->GetFileChecksumFuncName();
if (method != kUnknownFileChecksumFuncName) {
*checksum_method = std::move(method);
}
} }
} if (checksum_value) {
if (checksum_value) { assert(checksum_value->empty());
assert(checksum_value->empty());
std::string value = dest_->GetFileChecksum(); std::string value = dest_->GetFileChecksum();
if (value != kUnknownFileChecksum) { if (value != kUnknownFileChecksum) {
*checksum_value = std::move(value); *checksum_value = std::move(value);
}
} }
} }
} }

@ -109,12 +109,18 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context,
ROCKS_LOG_INFO(immutable_db_options_.info_log, ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[JOB %d] Syncing log #%" PRIu64, job_context->job_id, "[JOB %d] Syncing log #%" PRIu64, job_context->job_id,
log->get_log_number()); log->get_log_number());
if (error_handler_.IsRecoveryInProgress()) {
log->file()->reset_seen_error();
}
io_s = log->file()->Sync(immutable_db_options_.use_fsync); io_s = log->file()->Sync(immutable_db_options_.use_fsync);
if (!io_s.ok()) { if (!io_s.ok()) {
break; break;
} }
if (immutable_db_options_.recycle_log_file_num > 0) { if (immutable_db_options_.recycle_log_file_num > 0) {
if (error_handler_.IsRecoveryInProgress()) {
log->file()->reset_seen_error();
}
io_s = log->Close(); io_s = log->Close();
if (!io_s.ok()) { if (!io_s.ok()) {
break; break;

@ -1089,6 +1089,9 @@ void DBImpl::IOStatusCheck(const IOStatus& io_status) {
// Maybe change the return status to void? // Maybe change the return status to void?
error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback);
mutex_.Unlock(); mutex_.Unlock();
} else {
// Force writable file to be continue writable.
logs_.back().writer->file()->reset_seen_error();
} }
} }
@ -2084,6 +2087,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
if (!logs_.empty()) { if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one // Alway flush the buffer of the last log before switching to a new one
log::Writer* cur_log_writer = logs_.back().writer; log::Writer* cur_log_writer = logs_.back().writer;
if (error_handler_.IsRecoveryInProgress()) {
// In recovery path, we force another try of writing WAL buffer.
cur_log_writer->file()->reset_seen_error();
}
io_s = cur_log_writer->WriteBuffer(); io_s = cur_log_writer->WriteBuffer();
if (s.ok()) { if (s.ok()) {
s = io_s; s = io_s;

@ -13,6 +13,7 @@
#include "file/writable_file_writer.h" #include "file/writable_file_writer.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/io_status.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
@ -44,7 +45,12 @@ Writer::~Writer() {
} }
} }
IOStatus Writer::WriteBuffer() { return dest_->Flush(); } IOStatus Writer::WriteBuffer() {
if (dest_->seen_error()) {
return IOStatus::IOError("Seen error. Skip writing buffer.");
}
return dest_->Flush();
}
IOStatus Writer::Close() { IOStatus Writer::Close() {
IOStatus s; IOStatus s;

@ -281,7 +281,7 @@ void TestCustomizedTablePropertiesCollector(
builder->Add(ikey.Encode(), kv.second); builder->Add(ikey.Encode(), kv.second);
} }
ASSERT_OK(builder->Finish()); ASSERT_OK(builder->Finish());
writer->Flush(); ASSERT_OK(writer->Flush());
// -- Step 2: Read properties // -- Step 2: Read properties
test::StringSink* fwf = test::StringSink* fwf =
@ -421,7 +421,7 @@ void TestInternalKeyPropertiesCollector(
} }
ASSERT_OK(builder->Finish()); ASSERT_OK(builder->Finish());
writable->Flush(); ASSERT_OK(writable->Flush());
test::StringSink* fwf = test::StringSink* fwf =
static_cast<test::StringSink*>(writable->writable_file()); static_cast<test::StringSink*>(writable->writable_file());

@ -3226,7 +3226,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
InternalKey ikey(info.key, 0, ValueType::kTypeValue); InternalKey ikey(info.key, 0, ValueType::kTypeValue);
builder->Add(ikey.Encode(), "value"); builder->Add(ikey.Encode(), "value");
ASSERT_OK(builder->Finish()); ASSERT_OK(builder->Finish());
fwriter->Flush(); ASSERT_OK(fwriter->Flush());
uint64_t file_size = 0; uint64_t file_size = 0;
s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr); s = fs_->GetFileSize(fname, IOOptions(), &file_size, nullptr);
ASSERT_OK(s); ASSERT_OK(s);

@ -16,6 +16,7 @@
#include "monitoring/histogram.h" #include "monitoring/histogram.h"
#include "monitoring/iostats_context_imp.h" #include "monitoring/iostats_context_imp.h"
#include "port/port.h" #include "port/port.h"
#include "rocksdb/io_status.h"
#include "rocksdb/system_clock.h" #include "rocksdb/system_clock.h"
#include "test_util/sync_point.h" #include "test_util/sync_point.h"
#include "util/crc32c.h" #include "util/crc32c.h"
@ -23,6 +24,13 @@
#include "util/rate_limiter.h" #include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace {
IOStatus AssertFalseAndGetStatusForPrevError() {
assert(false);
return IOStatus::IOError("Writer has previous error.");
}
} // namespace
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs, IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
const std::string& fname, const std::string& fname,
const FileOptions& file_opts, const FileOptions& file_opts,
@ -43,6 +51,10 @@ IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum, IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
Env::IOPriority op_rate_limiter_priority) { Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
const char* src = data.data(); const char* src = data.data();
size_t left = data.size(); size_t left = data.size();
IOStatus s; IOStatus s;
@ -85,6 +97,7 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
if (buf_.CurrentSize() > 0) { if (buf_.CurrentSize() > 0) {
s = Flush(op_rate_limiter_priority); s = Flush(op_rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
} }
@ -165,12 +178,17 @@ IOStatus WritableFileWriter::Append(const Slice& data, uint32_t crc32c_checksum,
if (s.ok()) { if (s.ok()) {
uint64_t cur_size = filesize_.load(std::memory_order_acquire); uint64_t cur_size = filesize_.load(std::memory_order_acquire);
filesize_.store(cur_size + data.size(), std::memory_order_release); filesize_.store(cur_size + data.size(), std::memory_order_release);
} else {
seen_error_ = true;
} }
return s; return s;
} }
IOStatus WritableFileWriter::Pad(const size_t pad_bytes, IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
Env::IOPriority op_rate_limiter_priority) { Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
assert(pad_bytes < kDefaultPageSize); assert(pad_bytes < kDefaultPageSize);
size_t left = pad_bytes; size_t left = pad_bytes;
size_t cap = buf_.Capacity() - buf_.CurrentSize(); size_t cap = buf_.Capacity() - buf_.CurrentSize();
@ -186,6 +204,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
if (left > 0) { if (left > 0) {
IOStatus s = Flush(op_rate_limiter_priority); IOStatus s = Flush(op_rate_limiter_priority);
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
} }
@ -203,17 +222,31 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes,
} }
IOStatus WritableFileWriter::Close() { IOStatus WritableFileWriter::Close() {
if (seen_error_) {
IOStatus interim;
if (writable_file_.get() != nullptr) {
interim = writable_file_->Close(IOOptions(), nullptr);
writable_file_.reset();
}
if (interim.ok()) {
return IOStatus::IOError(
"File is closed but data not flushed as writer has previous error.");
} else {
return interim;
}
}
// Do not quit immediately on failure the file MUST be closed // Do not quit immediately on failure the file MUST be closed
IOStatus s;
// Possible to close it twice now as we MUST close // Possible to close it twice now as we MUST close
// in __dtor, simply flushing is not enough // in __dtor, simply flushing is not enough
// Windows when pre-allocating does not fill with zeros // Windows when pre-allocating does not fill with zeros
// also with unbuffered access we also set the end of data. // also with unbuffered access we also set the end of data.
if (writable_file_.get() == nullptr) { if (writable_file_.get() == nullptr) {
return s; return IOStatus::OK();
} }
IOStatus s;
s = Flush(); // flush cache to OS s = Flush(); // flush cache to OS
IOStatus interim; IOStatus interim;
@ -294,9 +327,13 @@ IOStatus WritableFileWriter::Close() {
writable_file_.reset(); writable_file_.reset();
TEST_KILL_RANDOM("WritableFileWriter::Close:1"); TEST_KILL_RANDOM("WritableFileWriter::Close:1");
if (s.ok() && checksum_generator_ != nullptr && !checksum_finalized_) { if (s.ok()) {
checksum_generator_->Finalize(); if (checksum_generator_ != nullptr && !checksum_finalized_) {
checksum_finalized_ = true; checksum_generator_->Finalize();
checksum_finalized_ = true;
}
} else {
seen_error_ = true;
} }
return s; return s;
@ -305,6 +342,10 @@ IOStatus WritableFileWriter::Close() {
// write out the cached data to the OS cache or storage if direct I/O // write out the cached data to the OS cache or storage if direct I/O
// enabled // enabled
IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) { IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
IOStatus s; IOStatus s;
TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2); TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
@ -329,6 +370,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
} }
} }
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
} }
@ -357,6 +399,7 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
} }
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
@ -383,6 +426,9 @@ IOStatus WritableFileWriter::Flush(Env::IOPriority op_rate_limiter_priority) {
if (offset_sync_to > 0 && if (offset_sync_to > 0 &&
offset_sync_to - last_sync_size_ >= bytes_per_sync_) { offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_); s = RangeSync(last_sync_size_, offset_sync_to - last_sync_size_);
if (!s.ok()) {
seen_error_ = true;
}
last_sync_size_ = offset_sync_to; last_sync_size_ = offset_sync_to;
} }
} }
@ -409,14 +455,20 @@ const char* WritableFileWriter::GetFileChecksumFuncName() const {
} }
IOStatus WritableFileWriter::Sync(bool use_fsync) { IOStatus WritableFileWriter::Sync(bool use_fsync) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
IOStatus s = Flush(); IOStatus s = Flush();
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
TEST_KILL_RANDOM("WritableFileWriter::Sync:0"); TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
if (!use_direct_io() && pending_sync_) { if (!use_direct_io() && pending_sync_) {
s = SyncInternal(use_fsync); s = SyncInternal(use_fsync);
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
} }
@ -426,6 +478,10 @@ IOStatus WritableFileWriter::Sync(bool use_fsync) {
} }
IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) { IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
if (!writable_file_->IsSyncThreadSafe()) { if (!writable_file_->IsSyncThreadSafe()) {
return IOStatus::NotSupported( return IOStatus::NotSupported(
"Can't WritableFileWriter::SyncWithoutFlush() because " "Can't WritableFileWriter::SyncWithoutFlush() because "
@ -434,10 +490,17 @@ IOStatus WritableFileWriter::SyncWithoutFlush(bool use_fsync) {
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1"); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
IOStatus s = SyncInternal(use_fsync); IOStatus s = SyncInternal(use_fsync);
TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2"); TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
if (!s.ok()) {
seen_error_ = true;
}
return s; return s;
} }
IOStatus WritableFileWriter::SyncInternal(bool use_fsync) { IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
IOStatus s; IOStatus s;
IOSTATS_TIMER_GUARD(fsync_nanos); IOSTATS_TIMER_GUARD(fsync_nanos);
TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0"); TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
@ -473,10 +536,17 @@ IOStatus WritableFileWriter::SyncInternal(bool use_fsync) {
} }
#endif #endif
SetPerfLevel(prev_perf_level); SetPerfLevel(prev_perf_level);
if (!s.ok()) {
seen_error_ = true;
}
return s; return s;
} }
IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) { IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
IOSTATS_TIMER_GUARD(range_sync_nanos); IOSTATS_TIMER_GUARD(range_sync_nanos);
TEST_SYNC_POINT("WritableFileWriter::RangeSync:0"); TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -488,6 +558,9 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
IOOptions io_options; IOOptions io_options;
io_options.rate_limiter_priority = writable_file_->GetIOPriority(); io_options.rate_limiter_priority = writable_file_->GetIOPriority();
IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr); IOStatus s = writable_file_->RangeSync(offset, nbytes, io_options, nullptr);
if (!s.ok()) {
seen_error_ = true;
}
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
if (ShouldNotifyListeners()) { if (ShouldNotifyListeners()) {
auto finish_ts = std::chrono::steady_clock::now(); auto finish_ts = std::chrono::steady_clock::now();
@ -505,6 +578,10 @@ IOStatus WritableFileWriter::RangeSync(uint64_t offset, uint64_t nbytes) {
// limiter if available // limiter if available
IOStatus WritableFileWriter::WriteBuffered( IOStatus WritableFileWriter::WriteBuffered(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
IOStatus s; IOStatus s;
assert(!use_direct_io()); assert(!use_direct_io());
const char* src = data; const char* src = data;
@ -576,6 +653,7 @@ IOStatus WritableFileWriter::WriteBuffered(
} }
#endif #endif
if (!s.ok()) { if (!s.ok()) {
seen_error_ = true;
return s; return s;
} }
} }
@ -590,11 +668,18 @@ IOStatus WritableFileWriter::WriteBuffered(
} }
buf_.Size(0); buf_.Size(0);
buffered_data_crc32c_checksum_ = 0; buffered_data_crc32c_checksum_ = 0;
if (!s.ok()) {
seen_error_ = true;
}
return s; return s;
} }
IOStatus WritableFileWriter::WriteBufferedWithChecksum( IOStatus WritableFileWriter::WriteBufferedWithChecksum(
const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) { const char* data, size_t size, Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
IOStatus s; IOStatus s;
assert(!use_direct_io()); assert(!use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_); assert(perform_data_verification_ && buffered_data_with_checksum_);
@ -666,6 +751,7 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
// and let caller determine error handling. // and let caller determine error handling.
buf_.Size(0); buf_.Size(0);
buffered_data_crc32c_checksum_ = 0; buffered_data_crc32c_checksum_ = 0;
seen_error_ = true;
return s; return s;
} }
} }
@ -679,6 +765,9 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(
buffered_data_crc32c_checksum_ = 0; buffered_data_crc32c_checksum_ = 0;
uint64_t cur_size = flushed_size_.load(std::memory_order_acquire); uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
flushed_size_.store(cur_size + left, std::memory_order_release); flushed_size_.store(cur_size + left, std::memory_order_release);
if (!s.ok()) {
seen_error_ = true;
}
return s; return s;
} }
@ -712,6 +801,12 @@ void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
IOStatus WritableFileWriter::WriteDirect( IOStatus WritableFileWriter::WriteDirect(
Env::IOPriority op_rate_limiter_priority) { Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
assert(false);
return IOStatus::IOError("Writer has previous error.");
}
assert(use_direct_io()); assert(use_direct_io());
IOStatus s; IOStatus s;
const size_t alignment = buf_.Alignment(); const size_t alignment = buf_.Alignment();
@ -778,6 +873,7 @@ IOStatus WritableFileWriter::WriteDirect(
} }
if (!s.ok()) { if (!s.ok()) {
buf_.Size(file_advance + leftover_tail); buf_.Size(file_advance + leftover_tail);
seen_error_ = true;
return s; return s;
} }
} }
@ -801,12 +897,18 @@ IOStatus WritableFileWriter::WriteDirect(
// is a multiple of whole pages otherwise filesize_ is leftover_tail // is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind // behind
next_write_offset_ += file_advance; next_write_offset_ += file_advance;
} else {
seen_error_ = true;
} }
return s; return s;
} }
IOStatus WritableFileWriter::WriteDirectWithChecksum( IOStatus WritableFileWriter::WriteDirectWithChecksum(
Env::IOPriority op_rate_limiter_priority) { Env::IOPriority op_rate_limiter_priority) {
if (seen_error_) {
return AssertFalseAndGetStatusForPrevError();
}
assert(use_direct_io()); assert(use_direct_io());
assert(perform_data_verification_ && buffered_data_with_checksum_); assert(perform_data_verification_ && buffered_data_with_checksum_);
IOStatus s; IOStatus s;
@ -884,6 +986,7 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
buf_.Size(file_advance + leftover_tail); buf_.Size(file_advance + leftover_tail);
buffered_data_crc32c_checksum_ = buffered_data_crc32c_checksum_ =
crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
seen_error_ = true;
return s; return s;
} }
} }
@ -907,6 +1010,8 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum(
// is a multiple of whole pages otherwise filesize_ is leftover_tail // is a multiple of whole pages otherwise filesize_ is leftover_tail
// behind // behind
next_write_offset_ += file_advance; next_write_offset_ += file_advance;
} else {
seen_error_ = true;
} }
return s; return s;
} }

@ -151,6 +151,7 @@ class WritableFileWriter {
uint64_t next_write_offset_; uint64_t next_write_offset_;
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
bool pending_sync_; bool pending_sync_;
bool seen_error_;
uint64_t last_sync_size_; uint64_t last_sync_size_;
uint64_t bytes_per_sync_; uint64_t bytes_per_sync_;
RateLimiter* rate_limiter_; RateLimiter* rate_limiter_;
@ -186,6 +187,7 @@ class WritableFileWriter {
next_write_offset_(0), next_write_offset_(0),
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
pending_sync_(false), pending_sync_(false),
seen_error_(false),
last_sync_size_(0), last_sync_size_(0),
bytes_per_sync_(options.bytes_per_sync), bytes_per_sync_(options.bytes_per_sync),
rate_limiter_(options.rate_limiter), rate_limiter_(options.rate_limiter),
@ -288,6 +290,11 @@ class WritableFileWriter {
const char* GetFileChecksumFuncName() const; const char* GetFileChecksumFuncName() const;
bool seen_error() const { return seen_error_; }
// For options of relaxed consistency, users might hope to continue
// operating on the file after an error happens.
void reset_seen_error() { seen_error_ = false; }
private: private:
// Decide the Rate Limiter priority. // Decide the Rate Limiter priority.
static Env::IOPriority DecideRateLimiterPriority( static Env::IOPriority DecideRateLimiterPriority(

@ -567,7 +567,7 @@ void TestBoundary(InternalKey& ik1, std::string& v1, InternalKey& ik2,
EXPECT_TRUE(builder->status().ok()); EXPECT_TRUE(builder->status().ok());
Status s = builder->Finish(); Status s = builder->Finish();
file_writer->Flush(); ASSERT_OK(file_writer->Flush());
EXPECT_TRUE(s.ok()) << s.ToString(); EXPECT_TRUE(s.ok()) << s.ToString();
EXPECT_EQ(sink->contents().size(), builder->FileSize()); EXPECT_EQ(sink->contents().size(), builder->FileSize());

@ -895,12 +895,14 @@ TEST_F(DBWritableFileWriterTest, IOErrorNotification) {
fwf->CheckCounters(1, 0); fwf->CheckCounters(1, 0);
ASSERT_EQ(listener->NotifyErrorCount(), 1); ASSERT_EQ(listener->NotifyErrorCount(), 1);
file_writer->reset_seen_error();
fwf->SetIOError(true); fwf->SetIOError(true);
ASSERT_NOK(file_writer->Flush()); ASSERT_NOK(file_writer->Flush());
fwf->CheckCounters(1, 1); fwf->CheckCounters(1, 1);
ASSERT_EQ(listener->NotifyErrorCount(), 2); ASSERT_EQ(listener->NotifyErrorCount(), 2);
/* No error generation */ /* No error generation */
file_writer->reset_seen_error();
fwf->SetIOError(false); fwf->SetIOError(false);
ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b'))); ASSERT_OK(file_writer->Append(std::string(2 * kMb, 'b')));
ASSERT_EQ(listener->NotifyErrorCount(), 2); ASSERT_EQ(listener->NotifyErrorCount(), 2);

Loading…
Cancel
Save