diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 674466c71..d9809ef7d 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -191,7 +191,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { immutable_options_->clock, io_tracer_, statistics, immutable_options_->listeners, immutable_options_->file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kBlobFile))); + tmp_set.Contains(FileType::kBlobFile), false)); constexpr bool do_flush = false; diff --git a/db/builder.cc b/db/builder.cc index 88c99a084..b37e7dd7d 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -165,7 +165,7 @@ Status BuildTable( std::move(file), fname, file_options, ioptions.clock, io_tracer, ioptions.stats, ioptions.listeners, ioptions.file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kTableFile))); + tmp_set.Contains(FileType::kTableFile), false)); builder = NewTableBuilder(tboptions, file_writer.get()); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 0a43aa0b4..d764265dd 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1986,7 +1986,7 @@ Status CompactionJob::OpenCompactionOutputFile( std::move(writable_file), fname, file_options_, db_options_.clock, io_tracer_, db_options_.stats, listeners, db_options_.file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kTableFile))); + tmp_set.Contains(FileType::kTableFile), false)); TableBuilderOptions tboptions( *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()), diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index c4866db17..b19210724 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -300,7 +300,8 @@ Status DBImpl::NewDB(std::vector* new_filenames) { std::unique_ptr file_writer(new WritableFileWriter( std::move(file), manifest, file_options, immutable_db_options_.clock, io_tracer_, nullptr /* stats */, immutable_db_options_.listeners, - nullptr, tmp_set.Contains(FileType::kDescriptorFile))); + nullptr, tmp_set.Contains(FileType::kDescriptorFile), + tmp_set.Contains(FileType::kDescriptorFile))); log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); @@ -1540,7 +1541,8 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, std::unique_ptr file_writer(new WritableFileWriter( std::move(lfile), log_fname, opt_file_options, immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners, - nullptr, tmp_set.Contains(FileType::kWalFile))); + nullptr, tmp_set.Contains(FileType::kWalFile), + tmp_set.Contains(FileType::kWalFile))); *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush); diff --git a/db/log_writer.cc b/db/log_writer.cc index 6a82f31e1..e2e596596 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -145,7 +145,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { } // Compute the crc of the record type and the payload. - crc = crc32c::Extend(crc, ptr, n); + uint32_t payload_crc = crc32c::Value(ptr, n); + crc = crc32c::Crc32cCombine(crc, payload_crc, n); crc = crc32c::Mask(crc); // Adjust for storage TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", &crc); @@ -154,7 +155,7 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { // Write the header and the payload IOStatus s = dest_->Append(Slice(buf, header_size)); if (s.ok()) { - s = dest_->Append(Slice(ptr, n)); + s = dest_->Append(Slice(ptr, n), payload_crc); } block_offset_ += header_size + n; return s; diff --git a/db/version_set.cc b/db/version_set.cc index 777d5cf6f..b99a8b39c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4133,6 +4133,7 @@ Status VersionSet::ProcessManifestWrites( std::unique_ptr file_writer(new WritableFileWriter( std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_, io_tracer_, nullptr, db_options_->listeners, nullptr, + tmp_set.Contains(FileType::kDescriptorFile), tmp_set.Contains(FileType::kDescriptorFile))); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index d009542c7..b20f0ea49 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -36,7 +36,8 @@ Status WritableFileWriter::Create(const std::shared_ptr& fs, return s; } -IOStatus WritableFileWriter::Append(const Slice& data) { +IOStatus WritableFileWriter::Append(const Slice& data, + uint32_t crc32c_checksum) { const char* src = data.data(); size_t left = data.size(); IOStatus s; @@ -81,26 +82,74 @@ IOStatus WritableFileWriter::Append(const Slice& data) { assert(buf_.CurrentSize() == 0); } - // We never write directly to disk with direct I/O on. - // or we simply use it for its original purpose to accumulate many small - // chunks - if (use_direct_io() || (buf_.Capacity() >= left)) { - while (left > 0) { - size_t appended = buf_.Append(src, left); - left -= appended; - src += appended; - - if (left > 0) { - s = Flush(); - if (!s.ok()) { - break; + if (perform_data_verification_ && buffered_data_with_checksum_ && + crc32c_checksum != 0) { + // Since we want to use the checksum of the input data, we cannot break it + // into several pieces. We will only write them in the buffer when buffer + // size is enough. Otherwise, we will directly write it down. + if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) { + if ((buf_.Capacity() - buf_.CurrentSize()) >= left) { + size_t appended = buf_.Append(src, left); + if (appended != left) { + s = IOStatus::Corruption("Write buffer append failure"); + } + buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine( + buffered_data_crc32c_checksum_, crc32c_checksum, appended); + } else { + while (left > 0) { + size_t appended = buf_.Append(src, left); + buffered_data_crc32c_checksum_ = + crc32c::Extend(buffered_data_crc32c_checksum_, src, appended); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); + if (!s.ok()) { + break; + } + } } } + } else { + assert(buf_.CurrentSize() == 0); + buffered_data_crc32c_checksum_ = crc32c_checksum; + s = WriteBufferedWithChecksum(src, left); } } else { - // Writing directly to file bypassing the buffer - assert(buf_.CurrentSize() == 0); - s = WriteBuffered(src, left); + // In this case, either we do not need to do the data verification or + // caller does not provide the checksum of the data (crc32c_checksum = 0). + // + // We never write directly to disk with direct I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (use_direct_io() || (buf_.Capacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + if (perform_data_verification_ && buffered_data_with_checksum_) { + buffered_data_crc32c_checksum_ = + crc32c::Extend(buffered_data_crc32c_checksum_, src, appended); + } + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); + if (!s.ok()) { + break; + } + } + } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.CurrentSize() == 0); + if (perform_data_verification_ && buffered_data_with_checksum_) { + buffered_data_crc32c_checksum_ = crc32c::Value(src, left); + s = WriteBufferedWithChecksum(src, left); + } else { + s = WriteBuffered(src, left); + } + } } TEST_KILL_RANDOM("WritableFileWriter::Append:1"); @@ -114,6 +163,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); + size_t pad_start = buf_.CurrentSize(); // Assume pad_bytes is small compared to buf_ capacity. So we always // use buf_ rather than write directly to file in certain cases like @@ -132,6 +182,11 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { } pending_sync_ = true; filesize_ += pad_bytes; + if (perform_data_verification_) { + buffered_data_crc32c_checksum_ = + crc32c::Extend(buffered_data_crc32c_checksum_, + buf_.BufferStart() + pad_start, pad_bytes); + } return IOStatus::OK(); } @@ -232,11 +287,19 @@ IOStatus WritableFileWriter::Flush() { if (use_direct_io()) { #ifndef ROCKSDB_LITE if (pending_sync_) { - s = WriteDirect(); + if (perform_data_verification_ && buffered_data_with_checksum_) { + s = WriteDirectWithChecksum(); + } else { + s = WriteDirect(); + } } #endif // !ROCKSDB_LITE } else { - s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + if (perform_data_verification_ && buffered_data_with_checksum_) { + s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize()); + } else { + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + } } if (!s.ok()) { return s; @@ -451,6 +514,76 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { src += allowed; } buf_.Size(0); + buffered_data_crc32c_checksum_ = 0; + return s; +} + +IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, + size_t size) { + IOStatus s; + assert(!use_direct_io()); + assert(perform_data_verification_ && buffered_data_with_checksum_); + const char* src = data; + size_t left = size; + DataVerificationInfo v_info; + char checksum_buf[sizeof(uint32_t)]; + + // Check how much is allowed. Here, we loop until the rate limiter allows to + // write the entire buffer. + // TODO: need to be improved since it sort of defeats the purpose of the rate + // limiter + size_t data_size = left; + if (rate_limiter_ != nullptr) { + while (data_size > 0) { + size_t tmp_size; + tmp_size = rate_limiter_->RequestToken( + data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, + RateLimiter::OpType::kWrite); + data_size -= tmp_size; + } + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint start_ts; + uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + old_size = next_write_offset_; + } +#endif + { + auto prev_perf_level = GetPerfLevel(); + + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_); + + EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); + v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); + s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, + nullptr); + SetPerfLevel(prev_perf_level); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::steady_clock::now(); + NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s); + } +#endif + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, left); + TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0"); + + // Buffer write is successful, reset the buffer current size to 0 and reset + // the corresponding checksum value + buf_.Size(0); + buffered_data_crc32c_checksum_ = 0; return s; } @@ -565,5 +698,99 @@ IOStatus WritableFileWriter::WriteDirect() { } return s; } + +IOStatus WritableFileWriter::WriteDirectWithChecksum() { + assert(use_direct_io()); + assert(perform_data_verification_ && buffered_data_with_checksum_); + IOStatus s; + const size_t alignment = buf_.Alignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.CurrentSize() - file_advance; + + // Round up, pad, and combine the checksum. + size_t last_cur_size = buf_.CurrentSize(); + buf_.PadToAlignmentWith(0); + size_t padded_size = buf_.CurrentSize() - last_cur_size; + const char* padded_start = buf_.BufferStart() + last_cur_size; + uint32_t padded_checksum = crc32c::Value(padded_start, padded_size); + buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine( + buffered_data_crc32c_checksum_, padded_checksum, padded_size); + + const char* src = buf_.BufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.CurrentSize(); + DataVerificationInfo v_info; + char checksum_buf[sizeof(uint32_t)]; + + // Check how much is allowed. Here, we loop until the rate limiter allows to + // write the entire buffer. + // TODO: need to be improved since it sort of defeats the purpose of the rate + // limiter + size_t data_size = left; + if (rate_limiter_ != nullptr) { + while (data_size > 0) { + size_t size; + size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + data_size -= size; + } + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + } + // direct writes must be positional + EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); + v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); + s = writable_file_->PositionedAppend(Slice(src, left), write_offset, + IOOptions(), v_info, nullptr); + + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::steady_clock::now(); + NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s); + } + if (!s.ok()) { + // In this case, we do not change buffered_data_crc32c_checksum_ because + // it still aligns with the data in the buffer. + buf_.Size(file_advance + leftover_tail); + buffered_data_crc32c_checksum_ = + crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); + return s; + } + } + + IOSTATS_ADD(bytes_written, left); + assert((next_write_offset_ % alignment) == 0); + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close(). Also the buffer checksum will + // recalculated accordingly. + buf_.RefitTail(file_advance, leftover_tail); + // Adjust the checksum value to align with the data in the buffer + buffered_data_crc32c_checksum_ = + crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index e6894281c..56741cd0f 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -144,6 +144,8 @@ class WritableFileWriter { std::unique_ptr checksum_generator_; bool checksum_finalized_; bool perform_data_verification_; + uint32_t buffered_data_crc32c_checksum_; + bool buffered_data_with_checksum_; public: WritableFileWriter( @@ -153,7 +155,8 @@ class WritableFileWriter { Statistics* stats = nullptr, const std::vector>& listeners = {}, FileChecksumGenFactory* file_checksum_gen_factory = nullptr, - bool perform_data_verification = false) + bool perform_data_verification = false, + bool buffered_data_with_checksum = false) : file_name_(_file_name), writable_file_(std::move(file), io_tracer, _file_name), clock_(clock), @@ -171,7 +174,9 @@ class WritableFileWriter { listeners_(), checksum_generator_(nullptr), checksum_finalized_(false), - perform_data_verification_(perform_data_verification) { + perform_data_verification_(perform_data_verification), + buffered_data_crc32c_checksum_(0), + buffered_data_with_checksum_(buffered_data_with_checksum) { TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); @@ -210,7 +215,9 @@ class WritableFileWriter { std::string file_name() const { return file_name_; } - IOStatus Append(const Slice& data); + // When this Append API is called, if the crc32c_checksum is not provided, we + // will calculate the checksum internally. + IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0); IOStatus Pad(const size_t pad_bytes); @@ -251,9 +258,11 @@ class WritableFileWriter { // DMA such as in Direct I/O mode #ifndef ROCKSDB_LITE IOStatus WriteDirect(); + IOStatus WriteDirectWithChecksum(); #endif // !ROCKSDB_LITE // Normal write IOStatus WriteBuffered(const char* data, size_t size); + IOStatus WriteBufferedWithChecksum(const char* data, size_t size); IOStatus RangeSync(uint64_t offset, uint64_t nbytes); IOStatus SyncInternal(bool use_fsync); }; diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 2e4a9202e..0d46c764b 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -264,7 +264,7 @@ Status SstFileWriter::Open(const std::string& file_path) { std::move(sst_file), file_path, r->env_options, r->ioptions.clock, nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners, r->ioptions.file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kTableFile))); + tmp_set.Contains(FileType::kTableFile), false)); // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful. diff --git a/util/crc32c.cc b/util/crc32c.cc index d2cbb6d90..87fbb9f84 100644 --- a/util/crc32c.cc +++ b/util/crc32c.cc @@ -1283,7 +1283,6 @@ uint32_t Extend(uint32_t crc, const char* buf, size_t size) { return ChosenExtend(crc, buf, size); } - // The code for crc32c combine, copied with permission from folly // Standard galois-field multiply. The only modification is that a, diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 449747024..a6e53223b 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -6,16 +6,20 @@ #include #include +#include "db/db_test_util.h" #include "env/mock_env.h" #include "file/line_file_reader.h" #include "file/random_access_file_reader.h" +#include "file/read_write_util.h" #include "file/readahead_raf.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "rocksdb/file_system.h" #include "test_util/testharness.h" #include "test_util/testutil.h" +#include "util/crc32c.h" #include "util/random.h" +#include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { @@ -230,6 +234,179 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) { } } +class DBWritableFileWriterTest : public DBTestBase { + public: + DBWritableFileWriterTest() + : DBTestBase("/db_secondary_cache_test", /*env_do_fsync=*/true) { + fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem())); + fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_)); + } + + std::shared_ptr fault_fs_; + std::unique_ptr fault_env_; +}; + +TEST_F(DBWritableFileWriterTest, AppendWithChecksum) { + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr file_writer; + ImmutableOptions ioptions(options); + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + Random rnd(301); + std::string data = rnd.RandomString(1000); + uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size()); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + ASSERT_OK(file_writer->Flush()); + Random size_r(47); + for (int i = 0; i < 2000; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + Destroy(options); +} + +TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) { + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr file_writer; + ImmutableOptions ioptions(options); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, false)); + + Random rnd(301); + std::string data = rnd.RandomString(1000); + uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size()); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + ASSERT_OK(file_writer->Flush()); + Random size_r(47); + for (int i = 0; i < 1000; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + Destroy(options); +} + +TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) { + FileOptions file_options = FileOptions(); + file_options.rate_limiter = nullptr; + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr file_writer; + ImmutableOptions ioptions(options); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + Random rnd(301); + std::string data; + uint32_t data_crc32c; + uint64_t start = fault_env_->NowMicros(); + Random size_r(47); + uint64_t bytes_written = 0; + for (int i = 0; i < 100; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + bytes_written += static_cast(data.size()); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + bytes_written += static_cast(data.size()); + } + uint64_t elapsed = fault_env_->NowMicros() - start; + double raw_rate = bytes_written * 1000000.0 / elapsed; + ASSERT_OK(file_writer->Close()); + + // Set the rate-limiter + FileOptions file_options1 = FileOptions(); + file_options1.rate_limiter = + NewGenericRateLimiter(static_cast(0.5 * raw_rate)); + fname = this->dbname_ + "/test_file_1"; + std::unique_ptr writable_file_ptr1; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1, + &writable_file_ptr1, + /*dbg*/ nullptr)); + file.reset(new TestFSWritableFile( + fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get())); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options1, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + for (int i = 0; i < 1000; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + if (file_options1.rate_limiter != nullptr) { + delete file_options1.rate_limiter; + } + + Destroy(options); +} + #ifndef ROCKSDB_LITE TEST_F(WritableFileWriterTest, AppendStatusReturn) { class FakeWF : public FSWritableFile { diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index 6c12c5be2..00b5eaa4e 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -140,8 +140,8 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions&, // By setting the IngestDataCorruptionBeforeWrite(), the data corruption is // simulated. IOStatus TestFSWritableFile::Append( - const Slice& data, const IOOptions&, - const DataVerificationInfo& verification_info, IODebugContext*) { + const Slice& data, const IOOptions& options, + const DataVerificationInfo& verification_info, IODebugContext* dbg) { MutexLock l(&mutex_); if (!fs_->IsFilesystemActive()) { return fs_->GetError(); @@ -161,10 +161,39 @@ IOStatus TestFSWritableFile::Append( "current data checksum: " + checksum; return IOStatus::Corruption(msg); } + if (target_->use_direct_io()) { + target_->Append(data, options, dbg).PermitUncheckedError(); + } else { + state_.buffer_.append(data.data(), data.size()); + state_.pos_ += data.size(); + fs_->WritableFileAppended(state_); + } + return IOStatus::OK(); +} - state_.buffer_.append(data.data(), data.size()); - state_.pos_ += data.size(); - fs_->WritableFileAppended(state_); +IOStatus TestFSWritableFile::PositionedAppend( + const Slice& data, uint64_t offset, const IOOptions& options, + const DataVerificationInfo& verification_info, IODebugContext* dbg) { + MutexLock l(&mutex_); + if (!fs_->IsFilesystemActive()) { + return fs_->GetError(); + } + if (fs_->ShouldDataCorruptionBeforeWrite()) { + return IOStatus::Corruption("Data is corrupted!"); + } + + // Calculate the checksum + std::string checksum; + CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(), + data.size(), &checksum); + if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum && + checksum != verification_info.checksum.ToString()) { + std::string msg = "Data is corrupted! Origin data checksum: " + + verification_info.checksum.ToString() + + "current data checksum: " + checksum; + return IOStatus::Corruption(msg); + } + target_->PositionedAppend(data, offset, options, dbg); return IOStatus::OK(); } diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index a51f46f81..2ee9afe04 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -65,9 +65,9 @@ class TestFSWritableFile : public FSWritableFile { virtual ~TestFSWritableFile(); virtual IOStatus Append(const Slice& data, const IOOptions&, IODebugContext*) override; - virtual IOStatus Append(const Slice& data, const IOOptions&, + virtual IOStatus Append(const Slice& data, const IOOptions& options, const DataVerificationInfo& verification_info, - IODebugContext*) override; + IODebugContext* dbg) override; virtual IOStatus Truncate(uint64_t size, const IOOptions& options, IODebugContext* dbg) override { return target_->Truncate(size, options, dbg); @@ -84,10 +84,8 @@ class TestFSWritableFile : public FSWritableFile { } IOStatus PositionedAppend(const Slice& data, uint64_t offset, const IOOptions& options, - const DataVerificationInfo& /*verification_info*/, - IODebugContext* dbg) override { - return PositionedAppend(data, offset, options, dbg); - } + const DataVerificationInfo& verification_info, + IODebugContext* dbg) override; virtual size_t GetRequiredBufferAlignment() const override { return target_->GetRequiredBufferAlignment(); }