Add checksum handshake for WAL fragment decompression (#10339)

Summary:
If WAL compression is enabled, WAL fragment decompression results are concatenated together in `log::Reader::ReadPhysicalRecord()`. This PR adds checksum handshake to protect memory corruption during the copying process.

`checksum` is renamed to `record_checksum` in `ReadRecord()` to differentiate it from `checksum_` flag that specifies whether CRC32C checksum is verified.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10339

Test Plan: added checksum verification in log_test.cc, `make check -j32`.

Reviewed By: ajkr

Differential Revision: D37763734

Pulled By: cbi42

fbshipit-source-id: c4faa7c76b9ff1df35026edf31adfe4b47ae3154
main
Changyu Bi 2 years ago committed by Facebook GitHub Bot
parent e637470f64
commit 2fc6df37d6
  1. 3
      HISTORY.md
  2. 59
      db/log_reader.cc
  3. 10
      db/log_reader.h
  4. 11
      db/log_test.cc

@ -15,6 +15,9 @@
* Fix a bug in `FIFOCompactionPicker::PickTTLCompaction` where total_size calculating might cause underflow * Fix a bug in `FIFOCompactionPicker::PickTTLCompaction` where total_size calculating might cause underflow
* Fix data race bug in hash linked list memtable. With this bug, read request might temporarily miss an old record in the memtable in a race condition to the hash bucket. * Fix data race bug in hash linked list memtable. With this bug, read request might temporarily miss an old record in the memtable in a race condition to the hash bucket.
### Behavior Change
* Added checksum handshake during the copying of decompressed WAL fragment. This together with #9875, #10037, #10212, #10114 and #10319 provides end-to-end integrity protection for write batch during recovery.
## 7.5.0 (07/15/2022) ## 7.5.0 (07/15/2022)
### New Features ### New Features
* Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`. * Mempurge option flag `experimental_mempurge_threshold` is now a ColumnFamilyOptions and can now be dynamically configured using `SetOptions()`.

@ -44,7 +44,8 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
compression_type_(kNoCompression), compression_type_(kNoCompression),
compression_type_record_read_(false), compression_type_record_read_(false),
uncompress_(nullptr), uncompress_(nullptr),
hash_state_(nullptr) {} hash_state_(nullptr),
uncompress_hash_state_(nullptr){};
Reader::~Reader() { Reader::~Reader() {
delete[] backing_store_; delete[] backing_store_;
@ -54,6 +55,9 @@ Reader::~Reader() {
if (hash_state_) { if (hash_state_) {
XXH3_freeState(hash_state_); XXH3_freeState(hash_state_);
} }
if (uncompress_hash_state_) {
XXH3_freeState(uncompress_hash_state_);
}
} }
// For kAbsoluteConsistency, on clean shutdown we don't expect any error // For kAbsoluteConsistency, on clean shutdown we don't expect any error
@ -64,10 +68,11 @@ Reader::~Reader() {
// TODO krad: Evaluate if we need to move to a more strict mode where we // TODO krad: Evaluate if we need to move to a more strict mode where we
// restrict the inconsistency to only the last log // restrict the inconsistency to only the last log
bool Reader::ReadRecord(Slice* record, std::string* scratch, bool Reader::ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode wal_recovery_mode, uint64_t* checksum) { WALRecoveryMode wal_recovery_mode,
uint64_t* record_checksum) {
scratch->clear(); scratch->clear();
record->clear(); record->clear();
if (checksum != nullptr) { if (record_checksum != nullptr) {
if (hash_state_ == nullptr) { if (hash_state_ == nullptr) {
hash_state_ = XXH3_createState(); hash_state_ = XXH3_createState();
} }
@ -85,7 +90,8 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
while (true) { while (true) {
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
size_t drop_size = 0; size_t drop_size = 0;
const unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size); const unsigned int record_type =
ReadPhysicalRecord(&fragment, &drop_size, record_checksum);
switch (record_type) { switch (record_type) {
case kFullType: case kFullType:
case kRecyclableFullType: case kRecyclableFullType:
@ -96,9 +102,12 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
// at the beginning of the next block. // at the beginning of the next block.
ReportCorruption(scratch->size(), "partial record without end(1)"); ReportCorruption(scratch->size(), "partial record without end(1)");
} }
if (checksum != nullptr) { // No need to compute record_checksum since the record
// consists of a single fragment and the checksum is computed
// in ReadPhysicalRecord() if WAL compression is enabled
if (record_checksum != nullptr && uncompress_ == nullptr) {
// No need to stream since the record is a single fragment // No need to stream since the record is a single fragment
*checksum = XXH3_64bits(fragment.data(), fragment.size()); *record_checksum = XXH3_64bits(fragment.data(), fragment.size());
} }
prospective_record_offset = physical_record_offset; prospective_record_offset = physical_record_offset;
scratch->clear(); scratch->clear();
@ -117,7 +126,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
ReportCorruption(scratch->size(), "partial record without end(2)"); ReportCorruption(scratch->size(), "partial record without end(2)");
XXH3_64bits_reset(hash_state_); XXH3_64bits_reset(hash_state_);
} }
if (checksum != nullptr) { if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
} }
prospective_record_offset = physical_record_offset; prospective_record_offset = physical_record_offset;
@ -131,7 +140,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
ReportCorruption(fragment.size(), ReportCorruption(fragment.size(),
"missing start of fragmented record(1)"); "missing start of fragmented record(1)");
} else { } else {
if (checksum != nullptr) { if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
} }
scratch->append(fragment.data(), fragment.size()); scratch->append(fragment.data(), fragment.size());
@ -144,9 +153,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
ReportCorruption(fragment.size(), ReportCorruption(fragment.size(),
"missing start of fragmented record(2)"); "missing start of fragmented record(2)");
} else { } else {
if (checksum != nullptr) { if (record_checksum != nullptr) {
XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); XXH3_64bits_update(hash_state_, fragment.data(), fragment.size());
*checksum = XXH3_64bits_digest(hash_state_); *record_checksum = XXH3_64bits_digest(hash_state_);
} }
scratch->append(fragment.data(), fragment.size()); scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch); *record = Slice(*scratch);
@ -417,7 +426,8 @@ bool Reader::ReadMore(size_t* drop_size, int *error) {
} }
} }
unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
uint64_t* fragment_checksum) {
while (true) { while (true) {
// We need at least the minimum header size // We need at least the minimum header size
if (buffer_.size() < static_cast<size_t>(kHeaderSize)) { if (buffer_.size() < static_cast<size_t>(kHeaderSize)) {
@ -500,6 +510,13 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
} else { } else {
// Uncompress compressed records // Uncompress compressed records
uncompressed_record_.clear(); uncompressed_record_.clear();
if (fragment_checksum != nullptr) {
if (uncompress_hash_state_ == nullptr) {
uncompress_hash_state_ = XXH3_createState();
}
XXH3_64bits_reset(uncompress_hash_state_);
}
size_t uncompressed_size = 0; size_t uncompressed_size = 0;
int remaining = 0; int remaining = 0;
do { do {
@ -511,10 +528,30 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) {
return kBadRecord; return kBadRecord;
} }
if (uncompressed_size > 0) { if (uncompressed_size > 0) {
if (fragment_checksum != nullptr) {
XXH3_64bits_update(uncompress_hash_state_,
uncompressed_buffer_.get(), uncompressed_size);
}
uncompressed_record_.append(uncompressed_buffer_.get(), uncompressed_record_.append(uncompressed_buffer_.get(),
uncompressed_size); uncompressed_size);
} }
} while (remaining > 0 || uncompressed_size == kBlockSize); } while (remaining > 0 || uncompressed_size == kBlockSize);
if (fragment_checksum != nullptr) {
// We can remove this check by updating hash_state_ directly,
// but that requires resetting hash_state_ for full and first types
// for edge cases like consecutive fist type records.
// Leaving the check as is since it is cleaner and can revert to the
// above approach if it causes performance impact.
*fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_);
uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(),
uncompressed_record_.size());
if (*fragment_checksum != actual_checksum) {
// uncompressed_record_ contains bad content that does not match
// actual decompressed content
return kBadRecord;
}
}
*result = Slice(uncompressed_record_); *result = Slice(uncompressed_record_);
return type; return type;
} }

@ -151,8 +151,10 @@ class Reader {
std::unique_ptr<char[]> uncompressed_buffer_; std::unique_ptr<char[]> uncompressed_buffer_;
// Reusable uncompressed record // Reusable uncompressed record
std::string uncompressed_record_; std::string uncompressed_record_;
// Used for stream hashing log record // Used for stream hashing fragment content in ReadRecord()
XXH3_state_t* hash_state_; XXH3_state_t* hash_state_;
// Used for stream hashing uncompressed buffer in ReadPhysicalRecord()
XXH3_state_t* uncompress_hash_state_;
// Extend record types with the following special values // Extend record types with the following special values
enum { enum {
@ -173,7 +175,11 @@ class Reader {
}; };
// Return type, or one of the preceding special values // Return type, or one of the preceding special values
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size); // If WAL compressioned is enabled, fragment_checksum is the checksum of the
// fragment computed from the orginal buffer containinng uncompressed
// fragment.
unsigned int ReadPhysicalRecord(Slice* result, size_t* drop_size,
uint64_t* fragment_checksum = nullptr);
// Read some more // Read some more
bool ReadMore(size_t* drop_size, int *error); bool ReadMore(size_t* drop_size, int *error);

@ -194,8 +194,17 @@ class LogTest
std::string scratch; std::string scratch;
Slice record; Slice record;
bool ret = false; bool ret = false;
ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode); uint64_t record_checksum;
ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode,
&record_checksum);
if (ret) { if (ret) {
if (!allow_retry_read_) {
// allow_retry_read_ means using FragmentBufferedReader which does not
// support record checksum yet.
uint64_t actual_record_checksum =
XXH3_64bits(record.data(), record.size());
assert(actual_record_checksum == record_checksum);
}
return record.ToString(); return record.ToString();
} else { } else {
return "EOF"; return "EOF";

Loading…
Cancel
Save