diff --git a/HISTORY.md b/HISTORY.md index 96f8d5200..cbc0615ca 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -9,6 +9,7 @@ * Add support for block checksums verification for external SST files before ingestion. * Add a place holder in manifest which indicate a record from future that can be safely ignored. * Add support for trace sampling. +* Enable properties block checksum verification for block-based tables. ### Public API Change * Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped. diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index bcbb3d546..0005f84ee 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -859,6 +859,66 @@ TEST_P(ExternalSSTFileBasicTest, IngestFileWithFirstByteTampered) { } while (ChangeOptionsForFileIngestionTest()); } +TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) { + bool verify_checksums_before_ingest = std::get<1>(GetParam()); + if (!verify_checksums_before_ingest) { + return; + } + uint64_t props_block_offset = 0; + size_t props_block_size = 0; + const auto& get_props_block_offset = [&](void* arg) { + props_block_offset = *reinterpret_cast(arg); + }; + const auto& get_props_block_size = [&](void* arg) { + props_block_size = *reinterpret_cast(arg); + }; + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset", + get_props_block_offset); + SyncPoint::GetInstance()->SetCallBack( + "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize", + get_props_block_size); + SyncPoint::GetInstance()->EnableProcessing(); + int file_id = 0; + Random64 rand(time(nullptr)); + do { + std::string file_path = sst_files_dir_ + ToString(file_id++); + Options options = CurrentOptions(); + SstFileWriter sst_file_writer(EnvOptions(), options); + Status s = sst_file_writer.Open(file_path); + ASSERT_OK(s); + for (int i = 0; i != 100; ++i) { + std::string key = Key(i); + std::string value = Key(i) + ToString(0); + ASSERT_OK(sst_file_writer.Put(key, value)); + } + ASSERT_OK(sst_file_writer.Finish()); + + { + std::unique_ptr rwfile; + ASSERT_OK(env_->NewRandomRWFile(file_path, &rwfile, EnvOptions())); + // Manually corrupt the file + ASSERT_GT(props_block_size, 8); + uint64_t offset = + props_block_offset + rand.Next() % (props_block_size - 8); + char scratch[8] = {0}; + Slice buf; + ASSERT_OK(rwfile->Read(offset, sizeof(scratch), &buf, scratch)); + scratch[0] ^= 0xff; // flip one bit + ASSERT_OK(rwfile->Write(offset, buf)); + } + + // Ingest file. + IngestExternalFileOptions ifo; + ifo.write_global_seqno = std::get<0>(GetParam()); + ifo.verify_checksums_before_ingest = true; + s = db_->IngestExternalFile({file_path}, ifo); + ASSERT_NOK(s); + } while (ChangeOptionsForFileIngestionTest()); +} + INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest, testing::Values(std::make_tuple(true, true), std::make_tuple(true, false), diff --git a/table/block.cc b/table/block.cc index 4e8d6e5ca..ab1e390ee 100644 --- a/table/block.cc +++ b/table/block.cc @@ -63,6 +63,39 @@ struct DecodeEntry { } }; +// Helper routine: similar to DecodeEntry but does not have assertions. +// Instead, returns nullptr so that caller can detect and report failure. +struct CheckAndDecodeEntry { + inline const char* operator()(const char* p, const char* limit, + uint32_t* shared, uint32_t* non_shared, + uint32_t* value_length) { + // We need 2 bytes for shared and non_shared size. We also need one more + // byte either for value size or the actual value in case of value delta + // encoding. + if (limit - p < 3) { + return nullptr; + } + *shared = reinterpret_cast(p)[0]; + *non_shared = reinterpret_cast(p)[1]; + *value_length = reinterpret_cast(p)[2]; + if ((*shared | *non_shared | *value_length) < 128) { + // Fast path: all three values are encoded in one byte each + p += 3; + } else { + if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr; + if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr; + if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) { + return nullptr; + } + } + + if (static_cast(limit - p) < (*non_shared + *value_length)) { + return nullptr; + } + return p; + } +}; + struct DecodeKey { inline const char* operator()(const char* p, const char* limit, uint32_t* shared, uint32_t* non_shared) { @@ -96,7 +129,12 @@ struct DecodeKeyV4 { void DataBlockIter::Next() { assert(Valid()); - ParseNextDataKey(); + ParseNextDataKey(); +} + +void DataBlockIter::NextOrReport() { + assert(Valid()); + ParseNextDataKey(); } void IndexBlockIter::Next() { @@ -179,7 +217,7 @@ void DataBlockIter::Prev() { SeekToRestartPoint(restart_index_); do { - if (!ParseNextDataKey()) { + if (!ParseNextDataKey()) { break; } Slice current_key = key(); @@ -218,7 +256,7 @@ void DataBlockIter::Seek(const Slice& target) { // Linear search (within restart block) for first key >= target while (true) { - if (!ParseNextDataKey() || Compare(key_, seek_key) >= 0) { + if (!ParseNextDataKey() || Compare(key_, seek_key) >= 0) { return; } } @@ -297,7 +335,7 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) { // // TODO(fwu): check the left and write boundary of the restart interval // to avoid linear seek a target key that is out of range. - if (!ParseNextDataKey(limit) || Compare(key_, target) >= 0) { + if (!ParseNextDataKey(limit) || Compare(key_, target) >= 0) { // we stop at the first potential matching user key. break; } @@ -391,7 +429,7 @@ void DataBlockIter::SeekForPrev(const Slice& target) { SeekToRestartPoint(index); // Linear search (within restart block) for first key >= seek_key - while (ParseNextDataKey() && Compare(key_, seek_key) < 0) { + while (ParseNextDataKey() && Compare(key_, seek_key) < 0) { } if (!Valid()) { SeekToLast(); @@ -407,7 +445,15 @@ void DataBlockIter::SeekToFirst() { return; } SeekToRestartPoint(0); - ParseNextDataKey(); + ParseNextDataKey(); +} + +void DataBlockIter::SeekToFirstOrReport() { + if (data_ == nullptr) { // Not init yet + return; + } + SeekToRestartPoint(0); + ParseNextDataKey(); } void IndexBlockIter::SeekToFirst() { @@ -423,7 +469,7 @@ void DataBlockIter::SeekToLast() { return; } SeekToRestartPoint(num_restarts_ - 1); - while (ParseNextDataKey() && NextEntryOffset() < restarts_) { + while (ParseNextDataKey() && NextEntryOffset() < restarts_) { // Keep skipping } } @@ -447,6 +493,7 @@ void BlockIter::CorruptionError() { value_.clear(); } +template bool DataBlockIter::ParseNextDataKey(const char* limit) { current_ = NextEntryOffset(); const char* p = data_ + current_; @@ -463,7 +510,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) { // Decode next entry uint32_t shared, non_shared, value_length; - p = DecodeEntry()(p, limit, &shared, &non_shared, &value_length); + p = DecodeEntryFunc()(p, limit, &shared, &non_shared, &value_length); if (p == nullptr || key_.Size() < shared) { CorruptionError(); return false; diff --git a/table/block.h b/table/block.h index 1a8073203..933058ee8 100644 --- a/table/block.h +++ b/table/block.h @@ -395,8 +395,18 @@ class DataBlockIter final : public BlockIter { virtual void Next() override; + // Try to advance to the next entry in the block. If there is data corruption + // or error, report it to the caller instead of aborting the process. May + // incur higher CPU overhead because we need to perform check on every entry. + void NextOrReport(); + virtual void SeekToFirst() override; + // Try to seek to the first entry in the block. If there is data corruption + // or error, report it to caller instead of aborting the process. May incur + // higher CPU overhead because we need to perform check on every entry. + void SeekToFirstOrReport(); + virtual void SeekToLast() override; void Invalidate(Status s) { @@ -439,6 +449,7 @@ class DataBlockIter final : public BlockIter { DataBlockHashIndex* data_block_hash_index_; const Comparator* user_comparator_; + template inline bool ParseNextDataKey(const char* limit = nullptr); inline int Compare(const IterKey& ikey, const Slice& b) const { diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 1e50866f6..acc0c85f5 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -853,6 +853,18 @@ void BlockBasedTableBuilder::WritePropertiesBlock( &properties_block_handle); } if (ok()) { +#ifndef NDEBUG + { + uint64_t props_block_offset = properties_block_handle.offset(); + uint64_t props_block_size = properties_block_handle.size(); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset", + &props_block_offset); + TEST_SYNC_POINT_CALLBACK( + "BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize", + &props_block_size); + } +#endif // !NDEBUG meta_index_builder->Add(kPropertiesBlock, properties_block_handle); } } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index b7757ee06..d7d823025 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -46,10 +46,12 @@ #include "monitoring/perf_context_imp.h" #include "util/coding.h" +#include "util/crc32c.h" #include "util/file_reader_writer.h" #include "util/stop_watch.h" #include "util/string_util.h" #include "util/sync_point.h" +#include "util/xxhash.h" namespace rocksdb { @@ -919,6 +921,33 @@ Status BlockBasedTable::PrefetchTail( return s; } +Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len, + uint32_t expected) { + Status s; + uint32_t actual = 0; + switch (type) { + case kNoChecksum: + break; + case kCRC32c: + expected = crc32c::Unmask(expected); + actual = crc32c::Value(buf, len); + break; + case kxxHash: + actual = XXH32(buf, static_cast(len), 0); + break; + case kxxHash64: + actual = static_cast(XXH64(buf, static_cast(len), 0) & + uint64_t{0xffffffff}); + break; + default: + s = Status::Corruption("unknown checksum type"); + } + if (s.ok() && actual != expected) { + s = Status::Corruption("properties block checksum mismatched"); + } + return s; +} + Status BlockBasedTable::ReadPropertiesBlock( Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter, const SequenceNumber largest_seqno) { @@ -934,10 +963,45 @@ Status BlockBasedTable::ReadPropertiesBlock( s = meta_iter->status(); TableProperties* table_properties = nullptr; if (s.ok()) { + s = ReadProperties( + meta_iter->value(), rep->file.get(), prefetch_buffer, rep->footer, + rep->ioptions, &table_properties, true /* verify_checksum */, + nullptr /* ret_block_handle */, nullptr /* ret_block_contents */, + false /* compression_type_missing */, nullptr /* memory_allocator */); + } + + if (s.IsCorruption()) { + // If this is an external SST file ingested with write_global_seqno set to + // true, then we expect the checksum mismatch because checksum was written + // by SstFileWriter, but its global seqno in the properties block may have + // been changed during ingestion. In this case, we read the properties + // block, copy it to a memory buffer, change the global seqno to its + // original value, i.e. 0, and verify the checksum again. + BlockHandle props_block_handle; + CacheAllocationPtr tmp_buf; s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer, rep->footer, rep->ioptions, &table_properties, - false /* compression_type_missing */, + false /* verify_checksum */, &props_block_handle, + &tmp_buf, false /* compression_type_missing */, nullptr /* memory_allocator */); + if (s.ok() && tmp_buf) { + const auto seqno_pos_iter = table_properties->properties_offsets.find( + ExternalSstFilePropertyNames::kGlobalSeqno); + size_t block_size = props_block_handle.size(); + if (seqno_pos_iter != table_properties->properties_offsets.end()) { + uint64_t global_seqno_offset = seqno_pos_iter->second; + EncodeFixed64( + tmp_buf.get() + global_seqno_offset - props_block_handle.offset(), + 0); + } + uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1); + s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(), + block_size + 1, value); + } + } + std::unique_ptr props_guard; + if (table_properties != nullptr) { + props_guard.reset(table_properties); } if (!s.ok()) { @@ -947,7 +1011,7 @@ Status BlockBasedTable::ReadPropertiesBlock( s.ToString().c_str()); } else { assert(table_properties != nullptr); - rep->table_properties.reset(table_properties); + rep->table_properties.reset(props_guard.release()); rep->blocks_maybe_compressed = rep->table_properties->compression_name != CompressionTypeToString(kNoCompression); rep->blocks_definitely_zstd_compressed = diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index 744091fbc..d8e241d0a 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -174,7 +174,9 @@ bool NotifyCollectTableCollectorsOnFinish( Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ImmutableCFOptions& ioptions, - TableProperties** table_properties, + TableProperties** table_properties, bool verify_checksum, + BlockHandle* ret_block_handle, + CacheAllocationPtr* verification_buf, bool /*compression_type_missing*/, MemoryAllocator* memory_allocator) { assert(table_properties); @@ -187,7 +189,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, BlockContents block_contents; ReadOptions read_options; - read_options.verify_checksums = false; + read_options.verify_checksums = verify_checksum; Status s; PersistentCacheOptions cache_options; @@ -248,16 +250,19 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, }; std::string last_key; - for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { + for (iter.SeekToFirstOrReport(); iter.Valid(); iter.NextOrReport()) { s = iter.status(); if (!s.ok()) { break; } auto key = iter.key().ToString(); - // properties block is strictly sorted with no duplicate key. - assert(last_key.empty() || - BytewiseComparator()->Compare(key, last_key) > 0); + // properties block should be strictly sorted with no duplicate key. + if (!last_key.empty() && + BytewiseComparator()->Compare(key, last_key) <= 0) { + s = Status::Corruption("properties unsorted"); + break; + } last_key = key; auto raw_val = iter.value(); @@ -306,6 +311,16 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, } if (s.ok()) { *table_properties = new_table_properties; + if (ret_block_handle != nullptr) { + *ret_block_handle = handle; + } + if (verification_buf != nullptr) { + size_t len = handle.size() + kBlockTrailerSize; + *verification_buf = rocksdb::AllocateBlock(len, memory_allocator); + if (verification_buf->get() != nullptr) { + memcpy(verification_buf->get(), block_contents.data.data(), len); + } + } } else { delete new_table_properties; } @@ -359,9 +374,11 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, TableProperties table_properties; if (found_properties_block == true) { - s = ReadProperties(meta_iter->value(), file, nullptr /* prefetch_buffer */, - footer, ioptions, properties, compression_type_missing, - memory_allocator); + s = ReadProperties( + meta_iter->value(), file, nullptr /* prefetch_buffer */, footer, + ioptions, properties, false /* verify_checksum */, + nullptr /* ret_block_hanel */, nullptr /* ret_block_contents */, + compression_type_missing, memory_allocator); } else { s = Status::NotFound(); } diff --git a/table/meta_blocks.h b/table/meta_blocks.h index 1c8fe686c..14ea6916a 100644 --- a/table/meta_blocks.h +++ b/table/meta_blocks.h @@ -96,7 +96,9 @@ bool NotifyCollectTableCollectorsOnFinish( Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, FilePrefetchBuffer* prefetch_buffer, const Footer& footer, const ImmutableCFOptions& ioptions, - TableProperties** table_properties, + TableProperties** table_properties, bool verify_checksum, + BlockHandle* block_handle, + CacheAllocationPtr* verification_buf, bool compression_type_missing = false, MemoryAllocator* memory_allocator = nullptr);