Introduce a helper method UncompressData (#7434)

Summary:
The patch introduces a helper method in `util/compression.h` called `UncompressData`
that dispatches calls to the correct uncompression method based on type, and changes
`UncompressBlockContentsForCompressionType` and `Benchmark::Uncompress` in
`db_bench` so they are implemented in terms of the new method. This eliminates
some code duplication. (`Benchmark::Compress` is also updated to use the previously
introduced `CompressData` helper.)

In addition, the patch brings the implementation of `Snappy_Uncompress` into sync with
the other uncompression methods by making the method compute the buffer size and allocate
the buffer itself. Finally, the patch eliminates some potentially risky back-and-forth conversions
between various unsigned and signed integer types by exposing the size of the allocated buffer
as a `size_t` instead of an `int`.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7434

Test Plan:
`make check`
`./db_bench -benchmarks=compress,uncompress --compression_type ...`

Reviewed By: riversand963

Differential Revision: D23900011

Pulled By: ltamasi

fbshipit-source-id: b25df63ceec4639889be94acb22eb53e530c54e0
main
Levi Tamasi 4 years ago committed by Facebook GitHub Bot
parent 9a63bbd391
commit 30fb9dd50f
  1. 15
      port/win/xpress_win.cc
  2. 3
      port/win/xpress_win.h
  3. 94
      table/format.cc
  4. 92
      tools/db_bench_tool.cc
  5. 106
      util/compression.h

@ -129,10 +129,9 @@ bool Compress(const char* input, size_t length, std::string* output) {
} }
char* Decompress(const char* input_data, size_t input_length, char* Decompress(const char* input_data, size_t input_length,
int* decompress_size) { size_t* uncompressed_size) {
assert(input_data != nullptr); assert(input_data != nullptr);
assert(decompress_size != nullptr); assert(uncompressed_size != nullptr);
if (input_length == 0) { if (input_length == 0) {
return nullptr; return nullptr;
@ -185,14 +184,6 @@ char* Decompress(const char* input_data, size_t input_length,
assert(decompressedBufferSize > 0); assert(decompressedBufferSize > 0);
// On Windows we are limited to a 32-bit int for the
// output data size argument
// so we hopefully never get here
if (decompressedBufferSize > std::numeric_limits<int>::max()) {
assert(false);
return nullptr;
}
// The callers are deallocating using delete[] // The callers are deallocating using delete[]
// thus we must allocate with new[] // thus we must allocate with new[]
std::unique_ptr<char[]> outputBuffer(new char[decompressedBufferSize]); std::unique_ptr<char[]> outputBuffer(new char[decompressedBufferSize]);
@ -216,7 +207,7 @@ char* Decompress(const char* input_data, size_t input_length,
return nullptr; return nullptr;
} }
*decompress_size = static_cast<int>(decompressedDataSize); *uncompressed_size = decompressedDataSize;
// Return the raw buffer to the caller supporting the tradition // Return the raw buffer to the caller supporting the tradition
return outputBuffer.release(); return outputBuffer.release();

@ -20,8 +20,7 @@ namespace xpress {
bool Compress(const char* input, size_t length, std::string* output); bool Compress(const char* input, size_t length, std::string* output);
char* Decompress(const char* input_data, size_t input_length, char* Decompress(const char* input_data, size_t input_length,
int* decompress_size); size_t* uncompressed_size);
} }
} }
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -347,100 +347,24 @@ Status UncompressBlockContentsForCompressionType(
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) { const ImmutableCFOptions& ioptions, MemoryAllocator* allocator) {
Status ret = Status::OK(); Status ret = Status::OK();
CacheAllocationPtr ubuf;
assert(uncompression_info.type() != kNoCompression && assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type"); "Invalid compression type");
StopWatchNano timer(ioptions.env, ShouldReportDetailedTime( StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(
ioptions.env, ioptions.statistics)); ioptions.env, ioptions.statistics));
int decompress_size = 0; size_t uncompressed_size = 0;
switch (uncompression_info.type()) { CacheAllocationPtr ubuf =
case kSnappyCompression: { UncompressData(uncompression_info, data, n, &uncompressed_size,
size_t ulength = 0; GetCompressFormatForVersion(format_version), allocator);
static char snappy_corrupt_msg[] =
"Snappy not supported or corrupted Snappy compressed block contents";
if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg);
}
ubuf = AllocateBlock(ulength, allocator);
if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), ulength);
break;
}
case kZlibCompression:
ubuf = Zlib_Uncompress(uncompression_info, data, n, &decompress_size,
GetCompressFormatForVersion(format_version),
allocator);
if (!ubuf) {
static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents";
return Status::Corruption(zlib_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kBZip2Compression:
ubuf = BZip2_Uncompress(data, n, &decompress_size,
GetCompressFormatForVersion(format_version),
allocator);
if (!ubuf) {
static char bzip2_corrupt_msg[] =
"Bzip2 not supported or corrupted Bzip2 compressed block contents";
return Status::Corruption(bzip2_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kLZ4Compression:
ubuf = LZ4_Uncompress(uncompression_info, data, n, &decompress_size,
GetCompressFormatForVersion(format_version),
allocator);
if (!ubuf) {
static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents";
return Status::Corruption(lz4_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kLZ4HCCompression:
ubuf = LZ4_Uncompress(uncompression_info, data, n, &decompress_size,
GetCompressFormatForVersion(format_version),
allocator);
if (!ubuf) {
static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents";
return Status::Corruption(lz4hc_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kXpressCompression:
// XPRESS allocates memory internally, thus no support for custom
// allocator.
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) {
static char xpress_corrupt_msg[] =
"XPRESS not supported or corrupted XPRESS compressed block "
"contents";
return Status::Corruption(xpress_corrupt_msg);
}
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
case kZSTD:
case kZSTDNotFinalCompression:
ubuf = ZSTD_Uncompress(uncompression_info, data, n, &decompress_size,
allocator);
if (!ubuf) { if (!ubuf) {
static char zstd_corrupt_msg[] = return Status::Corruption(
"ZSTD not supported or corrupted ZSTD compressed block contents"; "Unsupported compression method or corrupted compressed block contents",
return Status::Corruption(zstd_corrupt_msg); CompressionTypeToString(uncompression_info.type()));
}
*contents = BlockContents(std::move(ubuf), decompress_size);
break;
default:
return Status::Corruption("bad block type");
} }
*contents = BlockContents(std::move(ubuf), uncompressed_size);
if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) { if (ShouldReportDetailedTime(ioptions.env, ioptions.statistics)) {
RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS, RecordTimeToHistogram(ioptions.statistics, DECOMPRESSION_TIMES_NANOS,
timer.ElapsedNanos()); timer.ElapsedNanos());

@ -2443,40 +2443,10 @@ class Benchmark {
inline bool CompressSlice(const CompressionInfo& compression_info, inline bool CompressSlice(const CompressionInfo& compression_info,
const Slice& input, std::string* compressed) { const Slice& input, std::string* compressed) {
bool ok = true; constexpr uint32_t compress_format_version = 2;
switch (FLAGS_compression_type_e) {
case ROCKSDB_NAMESPACE::kSnappyCompression: return CompressData(input, compression_info, compress_format_version,
ok = Snappy_Compress(compression_info, input.data(), input.size(),
compressed);
break;
case ROCKSDB_NAMESPACE::kZlibCompression:
ok = Zlib_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case ROCKSDB_NAMESPACE::kBZip2Compression:
ok = BZip2_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case ROCKSDB_NAMESPACE::kLZ4Compression:
ok = LZ4_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case ROCKSDB_NAMESPACE::kLZ4HCCompression:
ok = LZ4HC_Compress(compression_info, 2, input.data(), input.size(),
compressed);
break;
case ROCKSDB_NAMESPACE::kXpressCompression:
ok = XPRESS_Compress(input.data(),
input.size(), compressed);
break;
case ROCKSDB_NAMESPACE::kZSTD:
ok = ZSTD_Compress(compression_info, input.data(), input.size(),
compressed); compressed);
break;
default:
ok = false;
}
return ok;
} }
void PrintHeader() { void PrintHeader() {
@ -3601,57 +3571,15 @@ class Benchmark {
bool ok = CompressSlice(compression_info, input, &compressed); bool ok = CompressSlice(compression_info, input, &compressed);
int64_t bytes = 0; int64_t bytes = 0;
int decompress_size; size_t uncompressed_size = 0;
while (ok && bytes < 1024 * 1048576) { while (ok && bytes < 1024 * 1048576) {
CacheAllocationPtr uncompressed; constexpr uint32_t compress_format_version = 2;
switch (FLAGS_compression_type_e) {
case ROCKSDB_NAMESPACE::kSnappyCompression: { CacheAllocationPtr uncompressed = UncompressData(
// get size and allocate here to make comparison fair uncompression_info, compressed.data(), compressed.size(),
size_t ulength = 0; &uncompressed_size, compress_format_version);
if (!Snappy_GetUncompressedLength(compressed.data(),
compressed.size(), &ulength)) {
ok = false;
break;
}
uncompressed = AllocateBlock(ulength, nullptr);
ok = Snappy_Uncompress(compressed.data(), compressed.size(),
uncompressed.get());
break;
}
case ROCKSDB_NAMESPACE::kZlibCompression:
uncompressed =
Zlib_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
case ROCKSDB_NAMESPACE::kBZip2Compression:
uncompressed = BZip2_Uncompress(compressed.data(), compressed.size(),
&decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
case ROCKSDB_NAMESPACE::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
case ROCKSDB_NAMESPACE::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size, 2);
ok = uncompressed.get() != nullptr;
break;
case ROCKSDB_NAMESPACE::kXpressCompression:
uncompressed.reset(XPRESS_Uncompress(
compressed.data(), compressed.size(), &decompress_size));
ok = uncompressed.get() != nullptr;
break;
case ROCKSDB_NAMESPACE::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_info, compressed.data(),
compressed.size(), &decompress_size);
ok = uncompressed.get() != nullptr; ok = uncompressed.get() != nullptr;
break;
default:
ok = false;
}
bytes += input.size(); bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress); thread->stats.FinishedOps(nullptr, nullptr, 1, kUncompress);
} }

@ -616,26 +616,30 @@ inline bool Snappy_Compress(const CompressionInfo& /*info*/, const char* input,
#endif #endif
} }
inline bool Snappy_GetUncompressedLength(const char* input, size_t length, inline CacheAllocationPtr Snappy_Uncompress(
size_t* result) { const char* input, size_t length, size_t* uncompressed_size,
MemoryAllocator* allocator = nullptr) {
#ifdef SNAPPY #ifdef SNAPPY
return snappy::GetUncompressedLength(input, length, result); size_t uncompressed_length = 0;
#else if (!snappy::GetUncompressedLength(input, length, &uncompressed_length)) {
(void)input; return nullptr;
(void)length; }
(void)result;
return false;
#endif
}
inline bool Snappy_Uncompress(const char* input, size_t length, char* output) { CacheAllocationPtr output = AllocateBlock(uncompressed_length, allocator);
#ifdef SNAPPY
return snappy::RawUncompress(input, length, output); if (!snappy::RawUncompress(input, length, output.get())) {
return nullptr;
}
*uncompressed_size = uncompressed_length;
return output;
#else #else
(void)input; (void)input;
(void)length; (void)length;
(void)output; (void)uncompressed_size;
return false; (void)allocator;
return nullptr;
#endif #endif
} }
@ -754,7 +758,7 @@ inline bool Zlib_Compress(const CompressionInfo& info,
// dictionary. // dictionary.
inline CacheAllocationPtr Zlib_Uncompress( inline CacheAllocationPtr Zlib_Uncompress(
const UncompressionInfo& info, const char* input_data, size_t input_length, const UncompressionInfo& info, const char* input_data, size_t input_length,
int* decompress_size, uint32_t compress_format_version, size_t* uncompressed_size, uint32_t compress_format_version,
MemoryAllocator* allocator = nullptr, int windowBits = -14) { MemoryAllocator* allocator = nullptr, int windowBits = -14) {
#ifdef ZLIB #ifdef ZLIB
uint32_t output_len = 0; uint32_t output_len = 0;
@ -836,14 +840,15 @@ inline CacheAllocationPtr Zlib_Uncompress(
// If we encoded decompressed block size, we should have no bytes left // If we encoded decompressed block size, we should have no bytes left
assert(compress_format_version != 2 || _stream.avail_out == 0); assert(compress_format_version != 2 || _stream.avail_out == 0);
*decompress_size = static_cast<int>(output_len - _stream.avail_out); assert(output_len >= _stream.avail_out);
*uncompressed_size = output_len - _stream.avail_out;
inflateEnd(&_stream); inflateEnd(&_stream);
return output; return output;
#else #else
(void)info; (void)info;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)uncompressed_size;
(void)compress_format_version; (void)compress_format_version;
(void)allocator; (void)allocator;
(void)windowBits; (void)windowBits;
@ -917,7 +922,7 @@ inline bool BZip2_Compress(const CompressionInfo& /*info*/,
// compress_format_version == 2 -- decompressed size is included in the block // compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format // header in varint32 format
inline CacheAllocationPtr BZip2_Uncompress( inline CacheAllocationPtr BZip2_Uncompress(
const char* input_data, size_t input_length, int* decompress_size, const char* input_data, size_t input_length, size_t* uncompressed_size,
uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) { uint32_t compress_format_version, MemoryAllocator* allocator = nullptr) {
#ifdef BZIP2 #ifdef BZIP2
uint32_t output_len = 0; uint32_t output_len = 0;
@ -982,13 +987,14 @@ inline CacheAllocationPtr BZip2_Uncompress(
// If we encoded decompressed block size, we should have no bytes left // If we encoded decompressed block size, we should have no bytes left
assert(compress_format_version != 2 || _stream.avail_out == 0); assert(compress_format_version != 2 || _stream.avail_out == 0);
*decompress_size = static_cast<int>(output_len - _stream.avail_out); assert(output_len >= _stream.avail_out);
*uncompressed_size = output_len - _stream.avail_out;
BZ2_bzDecompressEnd(&_stream); BZ2_bzDecompressEnd(&_stream);
return output; return output;
#else #else
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)uncompressed_size;
(void)compress_format_version; (void)compress_format_version;
(void)allocator; (void)allocator;
return nullptr; return nullptr;
@ -1073,7 +1079,7 @@ inline bool LZ4_Compress(const CompressionInfo& info,
inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info, inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
const char* input_data, const char* input_data,
size_t input_length, size_t input_length,
int* decompress_size, size_t* uncompressed_size,
uint32_t compress_format_version, uint32_t compress_format_version,
MemoryAllocator* allocator = nullptr) { MemoryAllocator* allocator = nullptr) {
#ifdef LZ4 #ifdef LZ4
@ -1096,6 +1102,9 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
} }
auto output = AllocateBlock(output_len, allocator); auto output = AllocateBlock(output_len, allocator);
int decompress_bytes = 0;
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
const Slice& compression_dict = info.dict().GetRawDict(); const Slice& compression_dict = info.dict().GetRawDict();
@ -1103,26 +1112,27 @@ inline CacheAllocationPtr LZ4_Uncompress(const UncompressionInfo& info,
LZ4_setStreamDecode(stream, compression_dict.data(), LZ4_setStreamDecode(stream, compression_dict.data(),
static_cast<int>(compression_dict.size())); static_cast<int>(compression_dict.size()));
} }
*decompress_size = LZ4_decompress_safe_continue( decompress_bytes = LZ4_decompress_safe_continue(
stream, input_data, output.get(), static_cast<int>(input_length), stream, input_data, output.get(), static_cast<int>(input_length),
static_cast<int>(output_len)); static_cast<int>(output_len));
LZ4_freeStreamDecode(stream); LZ4_freeStreamDecode(stream);
#else // up to r123 #else // up to r123
*decompress_size = LZ4_decompress_safe(input_data, output.get(), decompress_bytes = LZ4_decompress_safe(input_data, output.get(),
static_cast<int>(input_length), static_cast<int>(input_length),
static_cast<int>(output_len)); static_cast<int>(output_len));
#endif // LZ4_VERSION_NUMBER >= 10400 #endif // LZ4_VERSION_NUMBER >= 10400
if (*decompress_size < 0) { if (decompress_bytes < 0) {
return nullptr; return nullptr;
} }
assert(*decompress_size == static_cast<int>(output_len)); assert(decompress_bytes == static_cast<int>(output_len));
*uncompressed_size = decompress_bytes;
return output; return output;
#else // LZ4 #else // LZ4
(void)info; (void)info;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)uncompressed_size;
(void)compress_format_version; (void)compress_format_version;
(void)allocator; (void)allocator;
return nullptr; return nullptr;
@ -1227,13 +1237,13 @@ inline bool XPRESS_Compress(const char* /*input*/, size_t /*length*/,
#ifdef XPRESS #ifdef XPRESS
inline char* XPRESS_Uncompress(const char* input_data, size_t input_length, inline char* XPRESS_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) { size_t* uncompressed_size) {
return port::xpress::Decompress(input_data, input_length, decompress_size); return port::xpress::Decompress(input_data, input_length, uncompressed_size);
} }
#else #else
inline char* XPRESS_Uncompress(const char* /*input_data*/, inline char* XPRESS_Uncompress(const char* /*input_data*/,
size_t /*input_length*/, size_t /*input_length*/,
int* /*decompress_size*/) { size_t* /*uncompressed_size*/) {
return nullptr; return nullptr;
} }
#endif #endif
@ -1298,7 +1308,7 @@ inline bool ZSTD_Compress(const CompressionInfo& info, const char* input,
// dictionary. // dictionary.
inline CacheAllocationPtr ZSTD_Uncompress( inline CacheAllocationPtr ZSTD_Uncompress(
const UncompressionInfo& info, const char* input_data, size_t input_length, const UncompressionInfo& info, const char* input_data, size_t input_length,
int* decompress_size, MemoryAllocator* allocator = nullptr) { size_t* uncompressed_size, MemoryAllocator* allocator = nullptr) {
#ifdef ZSTD #ifdef ZSTD
uint32_t output_len = 0; uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
@ -1329,13 +1339,13 @@ inline CacheAllocationPtr ZSTD_Uncompress(
ZSTD_decompress(output.get(), output_len, input_data, input_length); ZSTD_decompress(output.get(), output_len, input_data, input_length);
#endif // ZSTD_VERSION_NUMBER >= 500 #endif // ZSTD_VERSION_NUMBER >= 500
assert(actual_output_length == output_len); assert(actual_output_length == output_len);
*decompress_size = static_cast<int>(actual_output_length); *uncompressed_size = actual_output_length;
return output; return output;
#else // ZSTD #else // ZSTD
(void)info; (void)info;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)uncompressed_size;
(void)allocator; (void)allocator;
return nullptr; return nullptr;
#endif #endif
@ -1449,4 +1459,34 @@ inline bool CompressData(const Slice& raw,
return ret; return ret;
} }
inline CacheAllocationPtr UncompressData(
const UncompressionInfo& uncompression_info, const char* data, size_t n,
size_t* uncompressed_size, uint32_t compress_format_version,
MemoryAllocator* allocator = nullptr) {
switch (uncompression_info.type()) {
case kSnappyCompression:
return Snappy_Uncompress(data, n, uncompressed_size, allocator);
case kZlibCompression:
return Zlib_Uncompress(uncompression_info, data, n, uncompressed_size,
compress_format_version, allocator);
case kBZip2Compression:
return BZip2_Uncompress(data, n, uncompressed_size,
compress_format_version, allocator);
case kLZ4Compression:
case kLZ4HCCompression:
return LZ4_Uncompress(uncompression_info, data, n, uncompressed_size,
compress_format_version, allocator);
case kXpressCompression:
// XPRESS allocates memory internally, thus no support for custom
// allocator.
return CacheAllocationPtr(XPRESS_Uncompress(data, n, uncompressed_size));
case kZSTD:
case kZSTDNotFinalCompression:
return ZSTD_Uncompress(uncompression_info, data, n, uncompressed_size,
allocator);
default:
return CacheAllocationPtr();
}
}
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save