Checksum properties block for block-based table (#4956)

Summary:
Always enable properties block checksum verification for block-based table. For external SST file ingested with 'write_global_seqno==true', we use 'DecodeEntrySlow' to parse its blocks' contents so that the process will not die upon failing the assertion possibly caused by corruption.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4956

Differential Revision: D14012741

Pulled By: riversand963

fbshipit-source-id: 8b766e6f54b36f8f9e074c0e19e0926ec3cce186
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent 5d9a623e2c
commit 2d049ab7e8
  1. 1
      HISTORY.md
  2. 60
      db/external_sst_file_basic_test.cc
  3. 63
      table/block.cc
  4. 11
      table/block.h
  5. 12
      table/block_based_table_builder.cc
  6. 68
      table/block_based_table_reader.cc
  7. 35
      table/meta_blocks.cc
  8. 4
      table/meta_blocks.h

@ -9,6 +9,7 @@
* Add support for block checksums verification for external SST files before ingestion.
* Add a place holder in manifest which indicate a record from future that can be safely ignored.
* Add support for trace sampling.
* Enable properties block checksum verification for block-based tables.
### Public API Change
* Disallow CompactionFilter::IgnoreSnapshots() = false, because it is not very useful and the behavior is confusing. The filter will filter everything if there is no snapshot declared by the time the compaction starts. However, users can define a snapshot after the compaction starts and before it finishes and this new snapshot won't be repeatable, because after the compaction finishes, some keys may be dropped.

@ -859,6 +859,66 @@ TEST_P(ExternalSSTFileBasicTest, IngestFileWithFirstByteTampered) {
} while (ChangeOptionsForFileIngestionTest());
}
TEST_P(ExternalSSTFileBasicTest, IngestExternalFileWithCorruptedPropsBlock) {
bool verify_checksums_before_ingest = std::get<1>(GetParam());
if (!verify_checksums_before_ingest) {
return;
}
uint64_t props_block_offset = 0;
size_t props_block_size = 0;
const auto& get_props_block_offset = [&](void* arg) {
props_block_offset = *reinterpret_cast<uint64_t*>(arg);
};
const auto& get_props_block_size = [&](void* arg) {
props_block_size = *reinterpret_cast<uint64_t*>(arg);
};
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
get_props_block_offset);
SyncPoint::GetInstance()->SetCallBack(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
get_props_block_size);
SyncPoint::GetInstance()->EnableProcessing();
int file_id = 0;
Random64 rand(time(nullptr));
do {
std::string file_path = sst_files_dir_ + ToString(file_id++);
Options options = CurrentOptions();
SstFileWriter sst_file_writer(EnvOptions(), options);
Status s = sst_file_writer.Open(file_path);
ASSERT_OK(s);
for (int i = 0; i != 100; ++i) {
std::string key = Key(i);
std::string value = Key(i) + ToString(0);
ASSERT_OK(sst_file_writer.Put(key, value));
}
ASSERT_OK(sst_file_writer.Finish());
{
std::unique_ptr<RandomRWFile> rwfile;
ASSERT_OK(env_->NewRandomRWFile(file_path, &rwfile, EnvOptions()));
// Manually corrupt the file
ASSERT_GT(props_block_size, 8);
uint64_t offset =
props_block_offset + rand.Next() % (props_block_size - 8);
char scratch[8] = {0};
Slice buf;
ASSERT_OK(rwfile->Read(offset, sizeof(scratch), &buf, scratch));
scratch[0] ^= 0xff; // flip one bit
ASSERT_OK(rwfile->Write(offset, buf));
}
// Ingest file.
IngestExternalFileOptions ifo;
ifo.write_global_seqno = std::get<0>(GetParam());
ifo.verify_checksums_before_ingest = true;
s = db_->IngestExternalFile({file_path}, ifo);
ASSERT_NOK(s);
} while (ChangeOptionsForFileIngestionTest());
}
INSTANTIATE_TEST_CASE_P(ExternalSSTFileBasicTest, ExternalSSTFileBasicTest,
testing::Values(std::make_tuple(true, true),
std::make_tuple(true, false),

@ -63,6 +63,39 @@ struct DecodeEntry {
}
};
// Helper routine: similar to DecodeEntry but does not have assertions.
// Instead, returns nullptr so that caller can detect and report failure.
struct CheckAndDecodeEntry {
inline const char* operator()(const char* p, const char* limit,
uint32_t* shared, uint32_t* non_shared,
uint32_t* value_length) {
// We need 2 bytes for shared and non_shared size. We also need one more
// byte either for value size or the actual value in case of value delta
// encoding.
if (limit - p < 3) {
return nullptr;
}
*shared = reinterpret_cast<const unsigned char*>(p)[0];
*non_shared = reinterpret_cast<const unsigned char*>(p)[1];
*value_length = reinterpret_cast<const unsigned char*>(p)[2];
if ((*shared | *non_shared | *value_length) < 128) {
// Fast path: all three values are encoded in one byte each
p += 3;
} else {
if ((p = GetVarint32Ptr(p, limit, shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, non_shared)) == nullptr) return nullptr;
if ((p = GetVarint32Ptr(p, limit, value_length)) == nullptr) {
return nullptr;
}
}
if (static_cast<uint32_t>(limit - p) < (*non_shared + *value_length)) {
return nullptr;
}
return p;
}
};
struct DecodeKey {
inline const char* operator()(const char* p, const char* limit,
uint32_t* shared, uint32_t* non_shared) {
@ -96,7 +129,12 @@ struct DecodeKeyV4 {
void DataBlockIter::Next() {
assert(Valid());
ParseNextDataKey();
ParseNextDataKey<DecodeEntry>();
}
void DataBlockIter::NextOrReport() {
assert(Valid());
ParseNextDataKey<CheckAndDecodeEntry>();
}
void IndexBlockIter::Next() {
@ -179,7 +217,7 @@ void DataBlockIter::Prev() {
SeekToRestartPoint(restart_index_);
do {
if (!ParseNextDataKey()) {
if (!ParseNextDataKey<DecodeEntry>()) {
break;
}
Slice current_key = key();
@ -218,7 +256,7 @@ void DataBlockIter::Seek(const Slice& target) {
// Linear search (within restart block) for first key >= target
while (true) {
if (!ParseNextDataKey() || Compare(key_, seek_key) >= 0) {
if (!ParseNextDataKey<DecodeEntry>() || Compare(key_, seek_key) >= 0) {
return;
}
}
@ -297,7 +335,7 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
//
// TODO(fwu): check the left and write boundary of the restart interval
// to avoid linear seek a target key that is out of range.
if (!ParseNextDataKey(limit) || Compare(key_, target) >= 0) {
if (!ParseNextDataKey<DecodeEntry>(limit) || Compare(key_, target) >= 0) {
// we stop at the first potential matching user key.
break;
}
@ -391,7 +429,7 @@ void DataBlockIter::SeekForPrev(const Slice& target) {
SeekToRestartPoint(index);
// Linear search (within restart block) for first key >= seek_key
while (ParseNextDataKey() && Compare(key_, seek_key) < 0) {
while (ParseNextDataKey<DecodeEntry>() && Compare(key_, seek_key) < 0) {
}
if (!Valid()) {
SeekToLast();
@ -407,7 +445,15 @@ void DataBlockIter::SeekToFirst() {
return;
}
SeekToRestartPoint(0);
ParseNextDataKey();
ParseNextDataKey<DecodeEntry>();
}
void DataBlockIter::SeekToFirstOrReport() {
if (data_ == nullptr) { // Not init yet
return;
}
SeekToRestartPoint(0);
ParseNextDataKey<CheckAndDecodeEntry>();
}
void IndexBlockIter::SeekToFirst() {
@ -423,7 +469,7 @@ void DataBlockIter::SeekToLast() {
return;
}
SeekToRestartPoint(num_restarts_ - 1);
while (ParseNextDataKey() && NextEntryOffset() < restarts_) {
while (ParseNextDataKey<DecodeEntry>() && NextEntryOffset() < restarts_) {
// Keep skipping
}
}
@ -447,6 +493,7 @@ void BlockIter<TValue>::CorruptionError() {
value_.clear();
}
template <typename DecodeEntryFunc>
bool DataBlockIter::ParseNextDataKey(const char* limit) {
current_ = NextEntryOffset();
const char* p = data_ + current_;
@ -463,7 +510,7 @@ bool DataBlockIter::ParseNextDataKey(const char* limit) {
// Decode next entry
uint32_t shared, non_shared, value_length;
p = DecodeEntry()(p, limit, &shared, &non_shared, &value_length);
p = DecodeEntryFunc()(p, limit, &shared, &non_shared, &value_length);
if (p == nullptr || key_.Size() < shared) {
CorruptionError();
return false;

@ -395,8 +395,18 @@ class DataBlockIter final : public BlockIter<Slice> {
virtual void Next() override;
// Try to advance to the next entry in the block. If there is data corruption
// or error, report it to the caller instead of aborting the process. May
// incur higher CPU overhead because we need to perform check on every entry.
void NextOrReport();
virtual void SeekToFirst() override;
// Try to seek to the first entry in the block. If there is data corruption
// or error, report it to caller instead of aborting the process. May incur
// higher CPU overhead because we need to perform check on every entry.
void SeekToFirstOrReport();
virtual void SeekToLast() override;
void Invalidate(Status s) {
@ -439,6 +449,7 @@ class DataBlockIter final : public BlockIter<Slice> {
DataBlockHashIndex* data_block_hash_index_;
const Comparator* user_comparator_;
template <typename DecodeEntryFunc>
inline bool ParseNextDataKey(const char* limit = nullptr);
inline int Compare(const IterKey& ikey, const Slice& b) const {

@ -853,6 +853,18 @@ void BlockBasedTableBuilder::WritePropertiesBlock(
&properties_block_handle);
}
if (ok()) {
#ifndef NDEBUG
{
uint64_t props_block_offset = properties_block_handle.offset();
uint64_t props_block_size = properties_block_handle.size();
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockOffset",
&props_block_offset);
TEST_SYNC_POINT_CALLBACK(
"BlockBasedTableBuilder::WritePropertiesBlock:GetPropsBlockSize",
&props_block_size);
}
#endif // !NDEBUG
meta_index_builder->Add(kPropertiesBlock, properties_block_handle);
}
}

@ -46,10 +46,12 @@
#include "monitoring/perf_context_imp.h"
#include "util/coding.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h"
#include "util/stop_watch.h"
#include "util/string_util.h"
#include "util/sync_point.h"
#include "util/xxhash.h"
namespace rocksdb {
@ -919,6 +921,33 @@ Status BlockBasedTable::PrefetchTail(
return s;
}
Status VerifyChecksum(const ChecksumType type, const char* buf, size_t len,
uint32_t expected) {
Status s;
uint32_t actual = 0;
switch (type) {
case kNoChecksum:
break;
case kCRC32c:
expected = crc32c::Unmask(expected);
actual = crc32c::Value(buf, len);
break;
case kxxHash:
actual = XXH32(buf, static_cast<int>(len), 0);
break;
case kxxHash64:
actual = static_cast<uint32_t>(XXH64(buf, static_cast<int>(len), 0) &
uint64_t{0xffffffff});
break;
default:
s = Status::Corruption("unknown checksum type");
}
if (s.ok() && actual != expected) {
s = Status::Corruption("properties block checksum mismatched");
}
return s;
}
Status BlockBasedTable::ReadPropertiesBlock(
Rep* rep, FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
const SequenceNumber largest_seqno) {
@ -934,10 +963,45 @@ Status BlockBasedTable::ReadPropertiesBlock(
s = meta_iter->status();
TableProperties* table_properties = nullptr;
if (s.ok()) {
s = ReadProperties(
meta_iter->value(), rep->file.get(), prefetch_buffer, rep->footer,
rep->ioptions, &table_properties, true /* verify_checksum */,
nullptr /* ret_block_handle */, nullptr /* ret_block_contents */,
false /* compression_type_missing */, nullptr /* memory_allocator */);
}
if (s.IsCorruption()) {
// If this is an external SST file ingested with write_global_seqno set to
// true, then we expect the checksum mismatch because checksum was written
// by SstFileWriter, but its global seqno in the properties block may have
// been changed during ingestion. In this case, we read the properties
// block, copy it to a memory buffer, change the global seqno to its
// original value, i.e. 0, and verify the checksum again.
BlockHandle props_block_handle;
CacheAllocationPtr tmp_buf;
s = ReadProperties(meta_iter->value(), rep->file.get(), prefetch_buffer,
rep->footer, rep->ioptions, &table_properties,
false /* compression_type_missing */,
false /* verify_checksum */, &props_block_handle,
&tmp_buf, false /* compression_type_missing */,
nullptr /* memory_allocator */);
if (s.ok() && tmp_buf) {
const auto seqno_pos_iter = table_properties->properties_offsets.find(
ExternalSstFilePropertyNames::kGlobalSeqno);
size_t block_size = props_block_handle.size();
if (seqno_pos_iter != table_properties->properties_offsets.end()) {
uint64_t global_seqno_offset = seqno_pos_iter->second;
EncodeFixed64(
tmp_buf.get() + global_seqno_offset - props_block_handle.offset(),
0);
}
uint32_t value = DecodeFixed32(tmp_buf.get() + block_size + 1);
s = rocksdb::VerifyChecksum(rep->footer.checksum(), tmp_buf.get(),
block_size + 1, value);
}
}
std::unique_ptr<TableProperties> props_guard;
if (table_properties != nullptr) {
props_guard.reset(table_properties);
}
if (!s.ok()) {
@ -947,7 +1011,7 @@ Status BlockBasedTable::ReadPropertiesBlock(
s.ToString().c_str());
} else {
assert(table_properties != nullptr);
rep->table_properties.reset(table_properties);
rep->table_properties.reset(props_guard.release());
rep->blocks_maybe_compressed = rep->table_properties->compression_name !=
CompressionTypeToString(kNoCompression);
rep->blocks_definitely_zstd_compressed =

@ -174,7 +174,9 @@ bool NotifyCollectTableCollectorsOnFinish(
Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ImmutableCFOptions& ioptions,
TableProperties** table_properties,
TableProperties** table_properties, bool verify_checksum,
BlockHandle* ret_block_handle,
CacheAllocationPtr* verification_buf,
bool /*compression_type_missing*/,
MemoryAllocator* memory_allocator) {
assert(table_properties);
@ -187,7 +189,7 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
BlockContents block_contents;
ReadOptions read_options;
read_options.verify_checksums = false;
read_options.verify_checksums = verify_checksum;
Status s;
PersistentCacheOptions cache_options;
@ -248,16 +250,19 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
};
std::string last_key;
for (iter.SeekToFirst(); iter.Valid(); iter.Next()) {
for (iter.SeekToFirstOrReport(); iter.Valid(); iter.NextOrReport()) {
s = iter.status();
if (!s.ok()) {
break;
}
auto key = iter.key().ToString();
// properties block is strictly sorted with no duplicate key.
assert(last_key.empty() ||
BytewiseComparator()->Compare(key, last_key) > 0);
// properties block should be strictly sorted with no duplicate key.
if (!last_key.empty() &&
BytewiseComparator()->Compare(key, last_key) <= 0) {
s = Status::Corruption("properties unsorted");
break;
}
last_key = key;
auto raw_val = iter.value();
@ -306,6 +311,16 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
}
if (s.ok()) {
*table_properties = new_table_properties;
if (ret_block_handle != nullptr) {
*ret_block_handle = handle;
}
if (verification_buf != nullptr) {
size_t len = handle.size() + kBlockTrailerSize;
*verification_buf = rocksdb::AllocateBlock(len, memory_allocator);
if (verification_buf->get() != nullptr) {
memcpy(verification_buf->get(), block_contents.data.data(), len);
}
}
} else {
delete new_table_properties;
}
@ -359,9 +374,11 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size,
TableProperties table_properties;
if (found_properties_block == true) {
s = ReadProperties(meta_iter->value(), file, nullptr /* prefetch_buffer */,
footer, ioptions, properties, compression_type_missing,
memory_allocator);
s = ReadProperties(
meta_iter->value(), file, nullptr /* prefetch_buffer */, footer,
ioptions, properties, false /* verify_checksum */,
nullptr /* ret_block_hanel */, nullptr /* ret_block_contents */,
compression_type_missing, memory_allocator);
} else {
s = Status::NotFound();
}

@ -96,7 +96,9 @@ bool NotifyCollectTableCollectorsOnFinish(
Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
FilePrefetchBuffer* prefetch_buffer, const Footer& footer,
const ImmutableCFOptions& ioptions,
TableProperties** table_properties,
TableProperties** table_properties, bool verify_checksum,
BlockHandle* block_handle,
CacheAllocationPtr* verification_buf,
bool compression_type_missing = false,
MemoryAllocator* memory_allocator = nullptr);

Loading…
Cancel
Save