diff --git a/HISTORY.md b/HISTORY.md index 4d6b6d46d..f31603688 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -15,6 +15,9 @@ * Fix a bug in `FIFOCompactionPicker::PickTTLCompaction` where total_size calculating might cause underflow * Fix data race bug in hash linked list memtable. With this bug, read request might temporarily miss an old record in the memtable in a race condition to the hash bucket. +### Behavior Change +* Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery. + ## 7.5.0 (07/15/2022) ### New Features * Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`. diff --git a/db/log_reader.cc b/db/log_reader.cc index a9f43ae80..eb5c88d25 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -44,7 +44,8 @@ Reader::Reader(std::shared_ptr info_log, compression_type_(kNoCompression), compression_type_record_read_(false), uncompress_(nullptr), - hash_state_(nullptr) {} + hash_state_(nullptr), + uncompress_hash_state_(nullptr){}; Reader::~Reader() { delete[] backing_store_; @@ -54,6 +55,9 @@ Reader::~Reader() { if (hash_state_) { XXH3_freeState(hash_state_); } + if (uncompress_hash_state_) { + XXH3_freeState(uncompress_hash_state_); + } } // For kAbsoluteConsistency, on clean shutdown we don't expect any error @@ -64,10 +68,11 @@ Reader::~Reader() { // TODO krad: Evaluate if we need to move to a more strict mode where we // restrict the inconsistency to only the last log bool Reader::ReadRecord(Slice* record, std::string* scratch, - WALRecoveryMode wal_recovery_mode, uint64_t* checksum) { + WALRecoveryMode wal_recovery_mode, + uint64_t* record_checksum) { scratch->clear(); record->clear(); - if (checksum != nullptr) { + if (record_checksum != nullptr) { if (hash_state_ == nullptr) { hash_state_ = XXH3_createState(); } @@ -85,7 +90,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, while (true) { uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); size_t drop_size = 0; - const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size); + const unsigned int record_type = + ReadPhysicalRecord(&fragment, &drop_size, record_checksum); switch (record_type) { case kFullType: case kRecyclableFullType: @@ -96,9 +102,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, // at the beginning of the next block. ReportCorruption(scratch->size(), "partial record without end(1)"); } - if (checksum != nullptr) { + // No need to compute record_checksum since the record + // consists of a single fragment and the checksum is computed + // in ReadPhysicalRecord() if WAL compression is enabled + if (record_checksum != nullptr && uncompress_ == nullptr) { // No need to stream since the record is a single fragment - *checksum = XXH3_64bits(fragment.data(), fragment.size()); + *record_checksum = XXH3_64bits(fragment.data(), fragment.size()); } prospective_record_offset = physical_record_offset; scratch->clear(); @@ -117,7 +126,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, ReportCorruption(scratch->size(), "partial record without end(2)"); XXH3_64bits_reset(hash_state_); } - if (checksum != nullptr) { + if (record_checksum != nullptr) { XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); } prospective_record_offset = physical_record_offset; @@ -131,7 +140,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, ReportCorruption(fragment.size(), "missing start of fragmented record(1)"); } else { - if (checksum != nullptr) { + if (record_checksum != nullptr) { XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); } scratch->append(fragment.data(), fragment.size()); @@ -144,9 +153,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, ReportCorruption(fragment.size(), "missing start of fragmented record(2)"); } else { - if (checksum != nullptr) { + if (record_checksum != nullptr) { XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); - *checksum = XXH3_64bits_digest(hash_state_); + *record_checksum = XXH3_64bits_digest(hash_state_); } scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); @@ -417,7 +426,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) { } } -unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { +unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, + uint64_t* fragment_checksum) { while (true) { // We need at least the minimum header size if (buffer_.size() < static_cast(kHeaderSize)) { @@ -500,6 +510,13 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { } else { // Uncompress compressed records uncompressed_record_.clear(); + if (fragment_checksum != nullptr) { + if (uncompress_hash_state_ == nullptr) { + uncompress_hash_state_ = XXH3_createState(); + } + XXH3_64bits_reset(uncompress_hash_state_); + } + size_t uncompressed_size = 0; int remaining = 0; do { @@ -511,10 +528,30 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { return kBadRecord; } if (uncompressed_size > 0) { + if (fragment_checksum != nullptr) { + XXH3_64bits_update(uncompress_hash_state_, + uncompressed_buffer_.get(), uncompressed_size); + } uncompressed_record_.append(uncompressed_buffer_.get(), uncompressed_size); } } while (remaining > 0 || uncompressed_size == kBlockSize); + + if (fragment_checksum != nullptr) { + // We can remove this check by updating hash_state_ directly, + // but that requires resetting hash_state_ for full and first types + // for edge cases like consecutive fist type records. + // Leaving the check as is since it is cleaner and can revert to the + // above approach if it causes performance impact. + *fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_); + uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(), + uncompressed_record_.size()); + if (*fragment_checksum != actual_checksum) { + // uncompressed_record_ contains bad content that does not match + // actual decompressed content + return kBadRecord; + } + } *result = Slice(uncompressed_record_); return type; } diff --git a/db/log_reader.h b/db/log_reader.h index 677939099..2ebeaaca9 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -151,8 +151,10 @@ class Reader { std::unique_ptr uncompressed_buffer_; // Reusable uncompressed record std::string uncompressed_record_; - // Used for stream hashing log record + // Used for stream hashing fragment content in ReadRecord() XXH3_state_t* hash_state_; + // Used for stream hashing uncompressed buffer in ReadPhysicalRecord() + XXH3_state_t* uncompress_hash_state_; // Extend record types with the following special values enum { @@ -173,7 +175,11 @@ class Reader { }; // Return type, or one of the preceding special values - unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); + // If WAL compressioned is enabled, fragment_checksum is the checksum of the + // fragment computed from the orginal buffer containinng uncompressed + // fragment. + unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size, + uint64_t* fragment_checksum = nullptr); // Read some more bool ReadMore(size_t* drop_size, int *error); diff --git a/db/log_test.cc b/db/log_test.cc index 444955083..668e26629 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -194,8 +194,17 @@ class LogTest std::string scratch; Slice record; bool ret = false; - ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode); + uint64_t record_checksum; + ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode, + &record_checksum); if (ret) { + if (!allow_retry_read_) { + // allow_retry_read_ means using FragmentBufferedReader which does not + // support record checksum yet. + uint64_t actual_record_checksum = + XXH3_64bits(record.data(), record.size()); + assert(actual_record_checksum == record_checksum); + } return record.ToString(); } else { return "EOF";