Add a check mode to verify compressed block can be decompressed back

Summary:
Try to decompress compressed blocks when a special flag is set.
assert and crash in debug builds if we can't decompress the just-compressed input.

Test Plan: Run unit-tests.

Reviewers: dhruba, andrewkr, sdong, IslamAbdelRahman

Reviewed By: IslamAbdelRahman

Subscribers: andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D59145
main
Nadav Rotem 9 years ago
parent 2a79af1c54
commit 7360db39e6
  1. 5
      include/rocksdb/table.h
  2. 36
      table/block_based_table_builder.cc
  3. 36
      table/format.cc
  4. 8
      table/format.h
  5. 5
      util/options_helper.h
  6. 3
      util/options_settable_test.cc

@ -161,6 +161,11 @@ struct BlockBasedTableOptions {
// Default: false // Default: false
bool skip_table_builder_flush = false; bool skip_table_builder_flush = false;
// Verify that decompressing the compressed block gives back the input. This
// is a verification mode that we use to detect bugs in compression
// algorithms.
bool verify_compression = false;
// We currently have three versions: // We currently have three versions:
// 0 -- This version is currently written out by all RocksDB's versions by // 0 -- This version is currently written out by all RocksDB's versions by
// default. Can be read by really old RocksDB's. Doesn't support changing // default. Can be read by really old RocksDB's. Doesn't support changing

@ -650,6 +650,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
auto type = r->compression_type; auto type = r->compression_type;
Slice block_contents; Slice block_contents;
bool abort_compression = false;
if (raw_block_contents.size() < kCompressionSizeLimit) { if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict; Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) { if (is_data_block && r->compression_dict && r->compression_dict->size()) {
@ -658,11 +659,46 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
block_contents = CompressBlock(raw_block_contents, r->compression_opts, block_contents = CompressBlock(raw_block_contents, r->compression_opts,
&type, r->table_options.format_version, &type, r->table_options.format_version,
compression_dict, &r->compressed_output); compression_dict, &r->compressed_output);
// Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the
// compressed data and compare to the input.
if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer
BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType(
block_contents.data(), block_contents.size(), &contents,
r->table_options.format_version, compression_dict, type);
if (stat.ok()) {
bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
if (!compressed_ok) {
// The result of the compression was invalid. abort.
abort_compression = true;
Log(InfoLogLevel::ERROR_LEVEL, r->ioptions.info_log,
"Decompressed block did not match raw block");
r->status =
Status::Corruption("Decompressed block did not match raw block");
}
} else {
// Decompression reported an error. abort.
r->status = Status::Corruption("Could not decompress");
abort_compression = true;
}
}
} else { } else {
// Block is too big to be compressed.
abort_compression = true;
}
// Abort compression if the block is too big, or did not pass
// verification.
if (abort_compression) {
RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED); RecordTick(r->ioptions.statistics, NUMBER_BLOCK_NOT_COMPRESSED);
type = kNoCompression; type = kNoCompression;
block_contents = raw_block_contents; block_contents = raw_block_contents;
} }
WriteRawBlock(block_contents, type, handle); WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear(); r->compressed_output.clear();
} }

@ -403,20 +403,16 @@ Status ReadBlockContents(RandomAccessFileReader* file, const Footer& footer,
return status; return status;
} }
// Status UncompressBlockContentsForCompressionType(
// The 'data' points to the raw block contents that was read in from file. const char* data, size_t n, BlockContents* contents,
// This method allocates a new heap buffer and the raw block uint32_t format_version, const Slice& compression_dict,
// contents are uncompresed into this buffer. This CompressionType compression_type) {
// buffer is returned via 'result' and it is upto the caller to
// free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const Slice& compression_dict) {
std::unique_ptr<char[]> ubuf; std::unique_ptr<char[]> ubuf;
assert(compression_type != kNoCompression && "Invalid compression type");
int decompress_size = 0; int decompress_size = 0;
assert(data[n] != kNoCompression); switch (compression_type) {
switch (data[n]) {
case kSnappyCompression: { case kSnappyCompression: {
size_t ulength = 0; size_t ulength = 0;
static char snappy_corrupt_msg[] = static char snappy_corrupt_msg[] =
@ -509,4 +505,20 @@ Status UncompressBlockContents(const char* data, size_t n,
return Status::OK(); return Status::OK();
} }
//
// The 'data' points to the raw block contents that was read in from file.
// This method allocates a new heap buffer and the raw block
// contents are uncompresed into this buffer. This
// buffer is returned via 'result' and it is upto the caller to
// free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const char* data, size_t n,
BlockContents* contents, uint32_t format_version,
const Slice& compression_dict) {
assert(data[n] != kNoCompression);
return UncompressBlockContentsForCompressionType(
data, n, contents, format_version, compression_dict,
(CompressionType)data[n]);
}
} // namespace rocksdb } // namespace rocksdb

@ -229,6 +229,14 @@ extern Status UncompressBlockContents(const char* data, size_t n,
uint32_t compress_format_version, uint32_t compress_format_version,
const Slice& compression_dict); const Slice& compression_dict);
// This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks
// with no compression header.
extern Status UncompressBlockContentsForCompressionType(
const char* data, size_t n, BlockContents* contents,
uint32_t compress_format_version, const Slice& compression_dict,
CompressionType compression_type);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,
inline BlockHandle::BlockHandle() inline BlockHandle::BlockHandle()

@ -546,7 +546,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
OptionType::kBoolean, OptionVerificationType::kNormal}}, OptionType::kBoolean, OptionVerificationType::kNormal}},
{"format_version", {"format_version",
{offsetof(struct BlockBasedTableOptions, format_version), {offsetof(struct BlockBasedTableOptions, format_version),
OptionType::kUInt32T, OptionVerificationType::kNormal}}}; OptionType::kUInt32T, OptionVerificationType::kNormal}},
{"verify_compression",
{offsetof(struct BlockBasedTableOptions, verify_compression),
OptionType::kBoolean, OptionVerificationType::kNormal}}};
static std::unordered_map<std::string, OptionTypeInfo> plain_table_type_info = { static std::unordered_map<std::string, OptionTypeInfo> plain_table_type_info = {
{"user_key_len", {"user_key_len",

@ -157,7 +157,8 @@ TEST_F(OptionsSettableTest, BlockBasedTableOptionsAllFieldsSettable) {
"index_block_restart_interval=4;" "index_block_restart_interval=4;"
"filter_policy=bloomfilter:4:true;whole_key_filtering=1;" "filter_policy=bloomfilter:4:true;whole_key_filtering=1;"
"skip_table_builder_flush=1;format_version=1;" "skip_table_builder_flush=1;format_version=1;"
"hash_index_allow_collision=false;", "hash_index_allow_collision=false;"
"verify_compression=true;",
new_bbto)); new_bbto));
ASSERT_EQ(unset_bytes_base, ASSERT_EQ(unset_bytes_base,

Loading…
Cancel
Save