diff --git a/HISTORY.md b/HISTORY.md index 97122ab74..96eb2ebc8 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -7,6 +7,7 @@ * Fixed a bug in rocksdb automatic implicit prefetching which got broken because of new feature adaptive_readahead and internal prefetching got disabled when iterator moves from one file to next. * Fixed a bug in TableOptions.prepopulate_block_cache which causes segmentation fault when used with TableOptions.partition_filters = true and TableOptions.cache_index_and_filter_blocks = true. * Fixed a bug affecting custom memtable factories which are not registered with the `ObjectRegistry`. The bug could result in failure to save the OPTIONS file. +* Fixed a bug causing two duplicate entries to be appended to a file opened in non-direct mode and tracked by `FaultInjectionTestFS`. ### Behavior Changes * MemTableList::TrimHistory now use allocated bytes when max_write_buffer_size_to_maintain > 0(default in TrasactionDB, introduced in PR#5022) Fix #8371. diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 86f018f4d..de675e691 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -28,6 +28,9 @@ #include "util/mutexlock.h" #include "util/random.h" #include "utilities/fault_injection_env.h" +#ifndef NDEBUG +#include "utilities/fault_injection_fs.h" +#endif namespace ROCKSDB_NAMESPACE { @@ -58,7 +61,6 @@ class FaultInjectionTest bool sequential_order_; - protected: public: enum ExpectedVerifResult { kValExpectFound, kValExpectNoError }; enum ResetMethod { @@ -544,6 +546,76 @@ TEST_P(FaultInjectionTest, WriteBatchWalTerminationTest) { ASSERT_EQ(db_->Get(ro, "boys", &val), Status::NotFound()); } +TEST_P(FaultInjectionTest, NoDuplicateTrailingEntries) { + auto fault_fs = std::make_shared(FileSystem::Default()); + fault_fs->EnableWriteErrorInjection(); + fault_fs->SetFilesystemDirectWritable(false); + const std::string file_name = NormalizePath(dbname_ + "/test_file"); + std::unique_ptr log_writer = nullptr; + constexpr uint64_t log_number = 0; + { + std::unique_ptr file; + const Status s = + fault_fs->NewWritableFile(file_name, FileOptions(), &file, nullptr); + ASSERT_OK(s); + std::unique_ptr fwriter( + new WritableFileWriter(std::move(file), file_name, FileOptions())); + log_writer.reset(new log::Writer(std::move(fwriter), log_number, + /*recycle_log_files=*/false)); + } + + fault_fs->SetRandomWriteError( + 0xdeadbeef, /*one_in=*/1, IOStatus::IOError("Injected IOError"), + /*inject_for_all_file_types=*/true, /*types=*/{}); + + { + VersionEdit edit; + edit.SetColumnFamily(0); + std::string buf; + assert(edit.EncodeTo(&buf)); + const Status s = log_writer->AddRecord(buf); + ASSERT_NOK(s); + } + + fault_fs->DisableWriteErrorInjection(); + + // Closing the log writer will cause WritableFileWriter::Close() and flush + // remaining data from its buffer to underlying file. + log_writer.reset(); + + { + std::unique_ptr file; + Status s = + fault_fs->NewSequentialFile(file_name, FileOptions(), &file, nullptr); + ASSERT_OK(s); + std::unique_ptr freader( + new SequentialFileReader(std::move(file), file_name)); + Status log_read_s; + class LogReporter : public log::Reader::Reporter { + public: + Status* status_; + explicit LogReporter(Status* _s) : status_(_s) {} + void Corruption(size_t /*bytes*/, const Status& _s) override { + if (status_->ok()) { + *status_ = _s; + } + } + } reporter(&log_read_s); + std::unique_ptr log_reader(new log::Reader( + nullptr, std::move(freader), &reporter, /*checksum=*/true, log_number)); + Slice record; + std::string data; + size_t count = 0; + while (log_reader->ReadRecord(&record, &data) && log_read_s.ok()) { + VersionEdit edit; + ASSERT_OK(edit.DecodeFrom(data)); + ++count; + } + // Verify that only one version edit exists in the file. + ASSERT_EQ(1, count); + } +} + INSTANTIATE_TEST_CASE_P( FaultTest, FaultInjectionTest, ::testing::Values(std::make_tuple(false, kDefault, kEnd), diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index 91d0067b8..d79f74fa7 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -516,6 +516,19 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { } else { s = writable_file_->Append(Slice(src, allowed), IOOptions(), nullptr); } + if (!s.ok()) { + // If writable_file_->Append() failed, then the data may or may not + // exist in the underlying memory buffer, OS page cache, remote file + // system's buffer, etc. If WritableFileWriter keeps the data in + // buf_, then a future Close() or write retry may send the data to + // the underlying file again. If the data does exist in the + // underlying buffer and gets written to the file eventually despite + // returning error, the file may end up with two duplicate pieces of + // data. Therefore, clear the buf_ at the WritableFileWriter layer + // and let caller determine error handling. + buf_.Size(0); + buffered_data_crc32c_checksum_ = 0; + } SetPerfLevel(prev_perf_level); } #ifndef ROCKSDB_LITE @@ -603,6 +616,17 @@ IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, } #endif if (!s.ok()) { + // If writable_file_->Append() failed, then the data may or may not + // exist in the underlying memory buffer, OS page cache, remote file + // system's buffer, etc. If WritableFileWriter keeps the data in + // buf_, then a future Close() or write retry may send the data to + // the underlying file again. If the data does exist in the + // underlying buffer and gets written to the file eventually despite + // returning error, the file may end up with two duplicate pieces of + // data. Therefore, clear the buf_ at the WritableFileWriter layer + // and let caller determine error handling. + buf_.Size(0); + buffered_data_crc32c_checksum_ = 0; return s; } } @@ -652,13 +676,13 @@ IOStatus WritableFileWriter::WriteDirect() { assert((next_write_offset_ % alignment) == 0); // Calculate whole page final file advance if all writes succeed - size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize()); + const size_t file_advance = + TruncateToPageBoundary(alignment, buf_.CurrentSize()); // Calculate the leftover tail, we write it here padded with zeros BUT we - // will write - // it again in the future either on Close() OR when the current whole page - // fills out - size_t leftover_tail = buf_.CurrentSize() - file_advance; + // will write it again in the future either on Close() OR when the current + // whole page fills out. + const size_t leftover_tail = buf_.CurrentSize() - file_advance; // Round up and pad buf_.PadToAlignmentWith(0); @@ -741,13 +765,13 @@ IOStatus WritableFileWriter::WriteDirectWithChecksum() { assert((next_write_offset_ % alignment) == 0); // Calculate whole page final file advance if all writes succeed - size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize()); + const size_t file_advance = + TruncateToPageBoundary(alignment, buf_.CurrentSize()); // Calculate the leftover tail, we write it here padded with zeros BUT we - // will write - // it again in the future either on Close() OR when the current whole page - // fills out - size_t leftover_tail = buf_.CurrentSize() - file_advance; + // will write it again in the future either on Close() OR when the current + // whole page fills out. + const size_t leftover_tail = buf_.CurrentSize() - file_advance; // Round up, pad, and combine the checksum. size_t last_cur_size = buf_.CurrentSize(); diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index ae9eda952..b33964489 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -507,6 +507,8 @@ class FaultInjectionTestFS : public FileSystemWrapper { int read_error_one_in() const { return read_error_one_in_.load(); } + int write_error_one_in() const { return write_error_one_in_; } + // We capture a backtrace every time a fault is injected, for debugging // purposes. This call prints the backtrace to stderr and frees the // saved callstack