From a904c62d286becdf125015bf292cd6f5b17d19fa Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Fri, 25 Jun 2021 00:46:33 -0700 Subject: [PATCH] Using existing crc32c checksum in checksum handoff for Manifest and WAL (#8412) Summary: In PR https://github.com/facebook/rocksdb/issues/7523 , checksum handoff is introduced in RocksDB for WAL, Manifest, and SST files. When user enable checksum handoff for a certain type of file, before the data is written to the lower layer storage system, we calculate the checksum (crc32c) of each piece of data and pass the checksum down with the data, such that data verification can be down by the lower layer storage system if it has the capability. However, it cannot cover the whole lifetime of the data in the memory and also it potentially introduces extra checksum calculation overhead. In this PR, we introduce a new interface in WritableFileWriter::Append, which allows the caller be able to pass the data and the checksum (crc32c) together. In this way, WritableFileWriter can directly use the pass-in checksum (crc32c) to generate the checksum of data being passed down to the storage system. It saves the calculation overhead and achieves higher protection coverage. When a new checksum is added with the data, we use Crc32cCombine https://github.com/facebook/rocksdb/issues/8305 to combine the existing checksum and the new checksum. To avoid the segmenting of data by rate-limiter before it is stored, rate-limiter is called enough times to accumulate enough credits for a certain write. This design only support Manifest and WAL which use log_writer in the current stage. Pull Request resolved: https://github.com/facebook/rocksdb/pull/8412 Test Plan: make check, add new testing cases. Reviewed By: anand1976 Differential Revision: D29151545 Pulled By: zhichao-cao fbshipit-source-id: 75e2278c5126cfd58393c67b1efd18dcc7a30772 --- db/blob/blob_file_builder.cc | 2 +- db/builder.cc | 2 +- db/compaction/compaction_job.cc | 2 +- db/db_impl/db_impl_open.cc | 6 +- db/log_writer.cc | 5 +- db/version_set.cc | 1 + file/writable_file_writer.cc | 265 +++++++++++++++++++++++++++++--- file/writable_file_writer.h | 15 +- table/sst_file_writer.cc | 2 +- util/crc32c.cc | 1 - util/file_reader_writer_test.cc | 177 +++++++++++++++++++++ utilities/fault_injection_fs.cc | 39 ++++- utilities/fault_injection_fs.h | 10 +- 13 files changed, 485 insertions(+), 42 deletions(-) diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc index 674466c71..d9809ef7d 100644 --- a/db/blob/blob_file_builder.cc +++ b/db/blob/blob_file_builder.cc @@ -191,7 +191,7 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() { immutable_options_->clock, io_tracer_, statistics, immutable_options_->listeners, immutable_options_->file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kBlobFile))); + tmp_set.Contains(FileType::kBlobFile), false)); constexpr bool do_flush = false; diff --git a/db/builder.cc b/db/builder.cc index 88c99a084..b37e7dd7d 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -165,7 +165,7 @@ Status BuildTable( std::move(file), fname, file_options, ioptions.clock, io_tracer, ioptions.stats, ioptions.listeners, ioptions.file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kTableFile))); + tmp_set.Contains(FileType::kTableFile), false)); builder = NewTableBuilder(tboptions, file_writer.get()); } diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 0a43aa0b4..d764265dd 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1986,7 +1986,7 @@ Status CompactionJob::OpenCompactionOutputFile( std::move(writable_file), fname, file_options_, db_options_.clock, io_tracer_, db_options_.stats, listeners, db_options_.file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kTableFile))); + tmp_set.Contains(FileType::kTableFile), false)); TableBuilderOptions tboptions( *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()), diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index c4866db17..b19210724 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -300,7 +300,8 @@ Status DBImpl::NewDB(std::vector* new_filenames) { std::unique_ptr file_writer(new WritableFileWriter( std::move(file), manifest, file_options, immutable_db_options_.clock, io_tracer_, nullptr /* stats */, immutable_db_options_.listeners, - nullptr, tmp_set.Contains(FileType::kDescriptorFile))); + nullptr, tmp_set.Contains(FileType::kDescriptorFile), + tmp_set.Contains(FileType::kDescriptorFile))); log::Writer log(std::move(file_writer), 0, false); std::string record; new_db.EncodeTo(&record); @@ -1540,7 +1541,8 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, std::unique_ptr file_writer(new WritableFileWriter( std::move(lfile), log_fname, opt_file_options, immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners, - nullptr, tmp_set.Contains(FileType::kWalFile))); + nullptr, tmp_set.Contains(FileType::kWalFile), + tmp_set.Contains(FileType::kWalFile))); *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, immutable_db_options_.manual_wal_flush); diff --git a/db/log_writer.cc b/db/log_writer.cc index 6a82f31e1..e2e596596 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -145,7 +145,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { } // Compute the crc of the record type and the payload. - crc = crc32c::Extend(crc, ptr, n); + uint32_t payload_crc = crc32c::Value(ptr, n); + crc = crc32c::Crc32cCombine(crc, payload_crc, n); crc = crc32c::Mask(crc); // Adjust for storage TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", &crc); @@ -154,7 +155,7 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { // Write the header and the payload IOStatus s = dest_->Append(Slice(buf, header_size)); if (s.ok()) { - s = dest_->Append(Slice(ptr, n)); + s = dest_->Append(Slice(ptr, n), payload_crc); } block_offset_ += header_size + n; return s; diff --git a/db/version_set.cc b/db/version_set.cc index 777d5cf6f..b99a8b39c 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -4133,6 +4133,7 @@ Status VersionSet::ProcessManifestWrites( std::unique_ptr file_writer(new WritableFileWriter( std::move(descriptor_file), descriptor_fname, opt_file_opts, clock_, io_tracer_, nullptr, db_options_->listeners, nullptr, + tmp_set.Contains(FileType::kDescriptorFile), tmp_set.Contains(FileType::kDescriptorFile))); descriptor_log_.reset( new log::Writer(std::move(file_writer), 0, false)); diff --git a/file/writable_file_writer.cc b/file/writable_file_writer.cc index d009542c7..b20f0ea49 100644 --- a/file/writable_file_writer.cc +++ b/file/writable_file_writer.cc @@ -36,7 +36,8 @@ Status WritableFileWriter::Create(const std::shared_ptr& fs, return s; } -IOStatus WritableFileWriter::Append(const Slice& data) { +IOStatus WritableFileWriter::Append(const Slice& data, + uint32_t crc32c_checksum) { const char* src = data.data(); size_t left = data.size(); IOStatus s; @@ -81,26 +82,74 @@ IOStatus WritableFileWriter::Append(const Slice& data) { assert(buf_.CurrentSize() == 0); } - // We never write directly to disk with direct I/O on. - // or we simply use it for its original purpose to accumulate many small - // chunks - if (use_direct_io() || (buf_.Capacity() >= left)) { - while (left > 0) { - size_t appended = buf_.Append(src, left); - left -= appended; - src += appended; - - if (left > 0) { - s = Flush(); - if (!s.ok()) { - break; + if (perform_data_verification_ && buffered_data_with_checksum_ && + crc32c_checksum != 0) { + // Since we want to use the checksum of the input data, we cannot break it + // into several pieces. We will only write them in the buffer when buffer + // size is enough. Otherwise, we will directly write it down. + if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) { + if ((buf_.Capacity() - buf_.CurrentSize()) >= left) { + size_t appended = buf_.Append(src, left); + if (appended != left) { + s = IOStatus::Corruption("Write buffer append failure"); + } + buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine( + buffered_data_crc32c_checksum_, crc32c_checksum, appended); + } else { + while (left > 0) { + size_t appended = buf_.Append(src, left); + buffered_data_crc32c_checksum_ = + crc32c::Extend(buffered_data_crc32c_checksum_, src, appended); + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); + if (!s.ok()) { + break; + } + } } } + } else { + assert(buf_.CurrentSize() == 0); + buffered_data_crc32c_checksum_ = crc32c_checksum; + s = WriteBufferedWithChecksum(src, left); } } else { - // Writing directly to file bypassing the buffer - assert(buf_.CurrentSize() == 0); - s = WriteBuffered(src, left); + // In this case, either we do not need to do the data verification or + // caller does not provide the checksum of the data (crc32c_checksum = 0). + // + // We never write directly to disk with direct I/O on. + // or we simply use it for its original purpose to accumulate many small + // chunks + if (use_direct_io() || (buf_.Capacity() >= left)) { + while (left > 0) { + size_t appended = buf_.Append(src, left); + if (perform_data_verification_ && buffered_data_with_checksum_) { + buffered_data_crc32c_checksum_ = + crc32c::Extend(buffered_data_crc32c_checksum_, src, appended); + } + left -= appended; + src += appended; + + if (left > 0) { + s = Flush(); + if (!s.ok()) { + break; + } + } + } + } else { + // Writing directly to file bypassing the buffer + assert(buf_.CurrentSize() == 0); + if (perform_data_verification_ && buffered_data_with_checksum_) { + buffered_data_crc32c_checksum_ = crc32c::Value(src, left); + s = WriteBufferedWithChecksum(src, left); + } else { + s = WriteBuffered(src, left); + } + } } TEST_KILL_RANDOM("WritableFileWriter::Append:1"); @@ -114,6 +163,7 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { assert(pad_bytes < kDefaultPageSize); size_t left = pad_bytes; size_t cap = buf_.Capacity() - buf_.CurrentSize(); + size_t pad_start = buf_.CurrentSize(); // Assume pad_bytes is small compared to buf_ capacity. So we always // use buf_ rather than write directly to file in certain cases like @@ -132,6 +182,11 @@ IOStatus WritableFileWriter::Pad(const size_t pad_bytes) { } pending_sync_ = true; filesize_ += pad_bytes; + if (perform_data_verification_) { + buffered_data_crc32c_checksum_ = + crc32c::Extend(buffered_data_crc32c_checksum_, + buf_.BufferStart() + pad_start, pad_bytes); + } return IOStatus::OK(); } @@ -232,11 +287,19 @@ IOStatus WritableFileWriter::Flush() { if (use_direct_io()) { #ifndef ROCKSDB_LITE if (pending_sync_) { - s = WriteDirect(); + if (perform_data_verification_ && buffered_data_with_checksum_) { + s = WriteDirectWithChecksum(); + } else { + s = WriteDirect(); + } } #endif // !ROCKSDB_LITE } else { - s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + if (perform_data_verification_ && buffered_data_with_checksum_) { + s = WriteBufferedWithChecksum(buf_.BufferStart(), buf_.CurrentSize()); + } else { + s = WriteBuffered(buf_.BufferStart(), buf_.CurrentSize()); + } } if (!s.ok()) { return s; @@ -451,6 +514,76 @@ IOStatus WritableFileWriter::WriteBuffered(const char* data, size_t size) { src += allowed; } buf_.Size(0); + buffered_data_crc32c_checksum_ = 0; + return s; +} + +IOStatus WritableFileWriter::WriteBufferedWithChecksum(const char* data, + size_t size) { + IOStatus s; + assert(!use_direct_io()); + assert(perform_data_verification_ && buffered_data_with_checksum_); + const char* src = data; + size_t left = size; + DataVerificationInfo v_info; + char checksum_buf[sizeof(uint32_t)]; + + // Check how much is allowed. Here, we loop until the rate limiter allows to + // write the entire buffer. + // TODO: need to be improved since it sort of defeats the purpose of the rate + // limiter + size_t data_size = left; + if (rate_limiter_ != nullptr) { + while (data_size > 0) { + size_t tmp_size; + tmp_size = rate_limiter_->RequestToken( + data_size, buf_.Alignment(), writable_file_->GetIOPriority(), stats_, + RateLimiter::OpType::kWrite); + data_size -= tmp_size; + } + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + +#ifndef ROCKSDB_LITE + FileOperationInfo::StartTimePoint start_ts; + uint64_t old_size = writable_file_->GetFileSize(IOOptions(), nullptr); + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + old_size = next_write_offset_; + } +#endif + { + auto prev_perf_level = GetPerfLevel(); + + IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_); + + EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); + v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); + s = writable_file_->Append(Slice(src, left), IOOptions(), v_info, + nullptr); + SetPerfLevel(prev_perf_level); + } +#ifndef ROCKSDB_LITE + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::steady_clock::now(); + NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s); + } +#endif + if (!s.ok()) { + return s; + } + } + + IOSTATS_ADD(bytes_written, left); + TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0"); + + // Buffer write is successful, reset the buffer current size to 0 and reset + // the corresponding checksum value + buf_.Size(0); + buffered_data_crc32c_checksum_ = 0; return s; } @@ -565,5 +698,99 @@ IOStatus WritableFileWriter::WriteDirect() { } return s; } + +IOStatus WritableFileWriter::WriteDirectWithChecksum() { + assert(use_direct_io()); + assert(perform_data_verification_ && buffered_data_with_checksum_); + IOStatus s; + const size_t alignment = buf_.Alignment(); + assert((next_write_offset_ % alignment) == 0); + + // Calculate whole page final file advance if all writes succeed + size_t file_advance = TruncateToPageBoundary(alignment, buf_.CurrentSize()); + + // Calculate the leftover tail, we write it here padded with zeros BUT we + // will write + // it again in the future either on Close() OR when the current whole page + // fills out + size_t leftover_tail = buf_.CurrentSize() - file_advance; + + // Round up, pad, and combine the checksum. + size_t last_cur_size = buf_.CurrentSize(); + buf_.PadToAlignmentWith(0); + size_t padded_size = buf_.CurrentSize() - last_cur_size; + const char* padded_start = buf_.BufferStart() + last_cur_size; + uint32_t padded_checksum = crc32c::Value(padded_start, padded_size); + buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine( + buffered_data_crc32c_checksum_, padded_checksum, padded_size); + + const char* src = buf_.BufferStart(); + uint64_t write_offset = next_write_offset_; + size_t left = buf_.CurrentSize(); + DataVerificationInfo v_info; + char checksum_buf[sizeof(uint32_t)]; + + // Check how much is allowed. Here, we loop until the rate limiter allows to + // write the entire buffer. + // TODO: need to be improved since it sort of defeats the purpose of the rate + // limiter + size_t data_size = left; + if (rate_limiter_ != nullptr) { + while (data_size > 0) { + size_t size; + size = rate_limiter_->RequestToken(data_size, buf_.Alignment(), + writable_file_->GetIOPriority(), + stats_, RateLimiter::OpType::kWrite); + data_size -= size; + } + } + + { + IOSTATS_TIMER_GUARD(write_nanos); + TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend"); + FileOperationInfo::StartTimePoint start_ts; + if (ShouldNotifyListeners()) { + start_ts = FileOperationInfo::StartNow(); + } + // direct writes must be positional + EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_); + v_info.checksum = Slice(checksum_buf, sizeof(uint32_t)); + s = writable_file_->PositionedAppend(Slice(src, left), write_offset, + IOOptions(), v_info, nullptr); + + if (ShouldNotifyListeners()) { + auto finish_ts = std::chrono::steady_clock::now(); + NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s); + } + if (!s.ok()) { + // In this case, we do not change buffered_data_crc32c_checksum_ because + // it still aligns with the data in the buffer. + buf_.Size(file_advance + leftover_tail); + buffered_data_crc32c_checksum_ = + crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); + return s; + } + } + + IOSTATS_ADD(bytes_written, left); + assert((next_write_offset_ % alignment) == 0); + + if (s.ok()) { + // Move the tail to the beginning of the buffer + // This never happens during normal Append but rather during + // explicit call to Flush()/Sync() or Close(). Also the buffer checksum will + // recalculated accordingly. + buf_.RefitTail(file_advance, leftover_tail); + // Adjust the checksum value to align with the data in the buffer + buffered_data_crc32c_checksum_ = + crc32c::Value(buf_.BufferStart(), buf_.CurrentSize()); + // This is where we start writing next time which may or not be + // the actual file size on disk. They match if the buffer size + // is a multiple of whole pages otherwise filesize_ is leftover_tail + // behind + next_write_offset_ += file_advance; + } + return s; +} #endif // !ROCKSDB_LITE } // namespace ROCKSDB_NAMESPACE diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h index e6894281c..56741cd0f 100644 --- a/file/writable_file_writer.h +++ b/file/writable_file_writer.h @@ -144,6 +144,8 @@ class WritableFileWriter { std::unique_ptr checksum_generator_; bool checksum_finalized_; bool perform_data_verification_; + uint32_t buffered_data_crc32c_checksum_; + bool buffered_data_with_checksum_; public: WritableFileWriter( @@ -153,7 +155,8 @@ class WritableFileWriter { Statistics* stats = nullptr, const std::vector>& listeners = {}, FileChecksumGenFactory* file_checksum_gen_factory = nullptr, - bool perform_data_verification = false) + bool perform_data_verification = false, + bool buffered_data_with_checksum = false) : file_name_(_file_name), writable_file_(std::move(file), io_tracer, _file_name), clock_(clock), @@ -171,7 +174,9 @@ class WritableFileWriter { listeners_(), checksum_generator_(nullptr), checksum_finalized_(false), - perform_data_verification_(perform_data_verification) { + perform_data_verification_(perform_data_verification), + buffered_data_crc32c_checksum_(0), + buffered_data_with_checksum_(buffered_data_with_checksum) { TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", reinterpret_cast(max_buffer_size_)); buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); @@ -210,7 +215,9 @@ class WritableFileWriter { std::string file_name() const { return file_name_; } - IOStatus Append(const Slice& data); + // When this Append API is called, if the crc32c_checksum is not provided, we + // will calculate the checksum internally. + IOStatus Append(const Slice& data, uint32_t crc32c_checksum = 0); IOStatus Pad(const size_t pad_bytes); @@ -251,9 +258,11 @@ class WritableFileWriter { // DMA such as in Direct I/O mode #ifndef ROCKSDB_LITE IOStatus WriteDirect(); + IOStatus WriteDirectWithChecksum(); #endif // !ROCKSDB_LITE // Normal write IOStatus WriteBuffered(const char* data, size_t size); + IOStatus WriteBufferedWithChecksum(const char* data, size_t size); IOStatus RangeSync(uint64_t offset, uint64_t nbytes); IOStatus SyncInternal(bool use_fsync); }; diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 2e4a9202e..0d46c764b 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -264,7 +264,7 @@ Status SstFileWriter::Open(const std::string& file_path) { std::move(sst_file), file_path, r->env_options, r->ioptions.clock, nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners, r->ioptions.file_checksum_gen_factory.get(), - tmp_set.Contains(FileType::kTableFile))); + tmp_set.Contains(FileType::kTableFile), false)); // TODO(tec) : If table_factory is using compressed block cache, we will // be adding the external sst file blocks into it, which is wasteful. diff --git a/util/crc32c.cc b/util/crc32c.cc index d2cbb6d90..87fbb9f84 100644 --- a/util/crc32c.cc +++ b/util/crc32c.cc @@ -1283,7 +1283,6 @@ uint32_t Extend(uint32_t crc, const char* buf, size_t size) { return ChosenExtend(crc, buf, size); } - // The code for crc32c combine, copied with permission from folly // Standard galois-field multiply. The only modification is that a, diff --git a/util/file_reader_writer_test.cc b/util/file_reader_writer_test.cc index 449747024..a6e53223b 100644 --- a/util/file_reader_writer_test.cc +++ b/util/file_reader_writer_test.cc @@ -6,16 +6,20 @@ #include #include +#include "db/db_test_util.h" #include "env/mock_env.h" #include "file/line_file_reader.h" #include "file/random_access_file_reader.h" +#include "file/read_write_util.h" #include "file/readahead_raf.h" #include "file/sequence_file_reader.h" #include "file/writable_file_writer.h" #include "rocksdb/file_system.h" #include "test_util/testharness.h" #include "test_util/testutil.h" +#include "util/crc32c.h" #include "util/random.h" +#include "utilities/fault_injection_fs.h" namespace ROCKSDB_NAMESPACE { @@ -230,6 +234,179 @@ TEST_F(WritableFileWriterTest, IncrementalBuffer) { } } +class DBWritableFileWriterTest : public DBTestBase { + public: + DBWritableFileWriterTest() + : DBTestBase("/db_secondary_cache_test", /*env_do_fsync=*/true) { + fault_fs_.reset(new FaultInjectionTestFS(env_->GetFileSystem())); + fault_env_.reset(new CompositeEnvWrapper(env_, fault_fs_)); + } + + std::shared_ptr fault_fs_; + std::unique_ptr fault_env_; +}; + +TEST_F(DBWritableFileWriterTest, AppendWithChecksum) { + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr file_writer; + ImmutableOptions ioptions(options); + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + Random rnd(301); + std::string data = rnd.RandomString(1000); + uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size()); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + ASSERT_OK(file_writer->Flush()); + Random size_r(47); + for (int i = 0; i < 2000; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + Destroy(options); +} + +TEST_F(DBWritableFileWriterTest, AppendVerifyNoChecksum) { + FileOptions file_options = FileOptions(); + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr file_writer; + ImmutableOptions ioptions(options); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, false)); + + Random rnd(301); + std::string data = rnd.RandomString(1000); + uint32_t data_crc32c = crc32c::Value(data.c_str(), data.size()); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + ASSERT_OK(file_writer->Flush()); + Random size_r(47); + for (int i = 0; i < 1000; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + Destroy(options); +} + +TEST_F(DBWritableFileWriterTest, AppendWithChecksumRateLimiter) { + FileOptions file_options = FileOptions(); + file_options.rate_limiter = nullptr; + Options options = GetDefaultOptions(); + options.create_if_missing = true; + DestroyAndReopen(options); + std::string fname = this->dbname_ + "/test_file"; + std::unique_ptr writable_file_ptr; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options, &writable_file_ptr, + /*dbg*/ nullptr)); + std::unique_ptr file; + file.reset(new TestFSWritableFile( + fname, file_options, std::move(writable_file_ptr), fault_fs_.get())); + std::unique_ptr file_writer; + ImmutableOptions ioptions(options); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + fault_fs_->SetChecksumHandoffFuncType(ChecksumType::kCRC32c); + + Random rnd(301); + std::string data; + uint32_t data_crc32c; + uint64_t start = fault_env_->NowMicros(); + Random size_r(47); + uint64_t bytes_written = 0; + for (int i = 0; i < 100; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + bytes_written += static_cast(data.size()); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + bytes_written += static_cast(data.size()); + } + uint64_t elapsed = fault_env_->NowMicros() - start; + double raw_rate = bytes_written * 1000000.0 / elapsed; + ASSERT_OK(file_writer->Close()); + + // Set the rate-limiter + FileOptions file_options1 = FileOptions(); + file_options1.rate_limiter = + NewGenericRateLimiter(static_cast(0.5 * raw_rate)); + fname = this->dbname_ + "/test_file_1"; + std::unique_ptr writable_file_ptr1; + ASSERT_OK(fault_fs_->NewWritableFile(fname, file_options1, + &writable_file_ptr1, + /*dbg*/ nullptr)); + file.reset(new TestFSWritableFile( + fname, file_options1, std::move(writable_file_ptr1), fault_fs_.get())); + // Enable checksum handoff for this file, but do not enable buffer checksum. + // So Append with checksum logic will not be triggered + file_writer.reset(new WritableFileWriter( + std::move(file), fname, file_options1, SystemClock::Default().get(), + nullptr, ioptions.stats, ioptions.listeners, + ioptions.file_checksum_gen_factory.get(), true, true)); + + for (int i = 0; i < 1000; i++) { + data = rnd.RandomString((static_cast(size_r.Next()) % 10000)); + data_crc32c = crc32c::Value(data.c_str(), data.size()); + ASSERT_OK(file_writer->Append(Slice(data.c_str()), data_crc32c)); + + data = rnd.RandomString((static_cast(size_r.Next()) % 97)); + ASSERT_OK(file_writer->Append(Slice(data.c_str()))); + ASSERT_OK(file_writer->Flush()); + } + ASSERT_OK(file_writer->Close()); + if (file_options1.rate_limiter != nullptr) { + delete file_options1.rate_limiter; + } + + Destroy(options); +} + #ifndef ROCKSDB_LITE TEST_F(WritableFileWriterTest, AppendStatusReturn) { class FakeWF : public FSWritableFile { diff --git a/utilities/fault_injection_fs.cc b/utilities/fault_injection_fs.cc index 6c12c5be2..00b5eaa4e 100644 --- a/utilities/fault_injection_fs.cc +++ b/utilities/fault_injection_fs.cc @@ -140,8 +140,8 @@ IOStatus TestFSWritableFile::Append(const Slice& data, const IOOptions&, // By setting the IngestDataCorruptionBeforeWrite(), the data corruption is // simulated. IOStatus TestFSWritableFile::Append( - const Slice& data, const IOOptions&, - const DataVerificationInfo& verification_info, IODebugContext*) { + const Slice& data, const IOOptions& options, + const DataVerificationInfo& verification_info, IODebugContext* dbg) { MutexLock l(&mutex_); if (!fs_->IsFilesystemActive()) { return fs_->GetError(); @@ -161,10 +161,39 @@ IOStatus TestFSWritableFile::Append( "current data checksum: " + checksum; return IOStatus::Corruption(msg); } + if (target_->use_direct_io()) { + target_->Append(data, options, dbg).PermitUncheckedError(); + } else { + state_.buffer_.append(data.data(), data.size()); + state_.pos_ += data.size(); + fs_->WritableFileAppended(state_); + } + return IOStatus::OK(); +} - state_.buffer_.append(data.data(), data.size()); - state_.pos_ += data.size(); - fs_->WritableFileAppended(state_); +IOStatus TestFSWritableFile::PositionedAppend( + const Slice& data, uint64_t offset, const IOOptions& options, + const DataVerificationInfo& verification_info, IODebugContext* dbg) { + MutexLock l(&mutex_); + if (!fs_->IsFilesystemActive()) { + return fs_->GetError(); + } + if (fs_->ShouldDataCorruptionBeforeWrite()) { + return IOStatus::Corruption("Data is corrupted!"); + } + + // Calculate the checksum + std::string checksum; + CalculateTypedChecksum(fs_->GetChecksumHandoffFuncType(), data.data(), + data.size(), &checksum); + if (fs_->GetChecksumHandoffFuncType() != ChecksumType::kNoChecksum && + checksum != verification_info.checksum.ToString()) { + std::string msg = "Data is corrupted! Origin data checksum: " + + verification_info.checksum.ToString() + + "current data checksum: " + checksum; + return IOStatus::Corruption(msg); + } + target_->PositionedAppend(data, offset, options, dbg); return IOStatus::OK(); } diff --git a/utilities/fault_injection_fs.h b/utilities/fault_injection_fs.h index a51f46f81..2ee9afe04 100644 --- a/utilities/fault_injection_fs.h +++ b/utilities/fault_injection_fs.h @@ -65,9 +65,9 @@ class TestFSWritableFile : public FSWritableFile { virtual ~TestFSWritableFile(); virtual IOStatus Append(const Slice& data, const IOOptions&, IODebugContext*) override; - virtual IOStatus Append(const Slice& data, const IOOptions&, + virtual IOStatus Append(const Slice& data, const IOOptions& options, const DataVerificationInfo& verification_info, - IODebugContext*) override; + IODebugContext* dbg) override; virtual IOStatus Truncate(uint64_t size, const IOOptions& options, IODebugContext* dbg) override { return target_->Truncate(size, options, dbg); @@ -84,10 +84,8 @@ class TestFSWritableFile : public FSWritableFile { } IOStatus PositionedAppend(const Slice& data, uint64_t offset, const IOOptions& options, - const DataVerificationInfo& /*verification_info*/, - IODebugContext* dbg) override { - return PositionedAppend(data, offset, options, dbg); - } + const DataVerificationInfo& verification_info, + IODebugContext* dbg) override; virtual size_t GetRequiredBufferAlignment() const override { return target_->GetRequiredBufferAlignment(); }