From abb9b95ffeb83a8d1ac7da0454a85ecea99adea6 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Fri, 9 Jan 2015 12:57:11 -0800 Subject: [PATCH] Move compression functions from port/ to util/ Summary: We keep checksum functions in util/, there is no reason for compression to be in port/ Test Plan: compiles Reviewers: sdong Reviewed By: sdong Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D31281 --- db/filename.h | 4 +- port/port_posix.h | 352 -------------------------- table/block_based_table_builder.cc | 21 +- table/format.cc | 20 +- util/compression.h | 367 ++++++++++++++++++++++++++++ util/thread_status_updater.cc | 1 + util/thread_status_updater.h | 8 +- utilities/document/json_document.cc | 4 +- 8 files changed, 399 insertions(+), 378 deletions(-) create mode 100644 util/compression.h diff --git a/db/filename.h b/db/filename.h index 4136ff12e..fda873676 100644 --- a/db/filename.h +++ b/db/filename.h @@ -14,10 +14,12 @@ #include #include #include + +#include "port/port.h" +#include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/transaction_log.h" -#include "port/port.h" namespace rocksdb { diff --git a/port/port_posix.h b/port/port_posix.h index 476542cfc..f730c483b 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -34,27 +34,10 @@ #include #endif #include -#ifdef SNAPPY -#include -#endif - -#ifdef ZLIB -#include -#endif - -#ifdef BZIP2 -#include -#endif - -#if defined(LZ4) -#include -#include -#endif #include #include #include -#include "rocksdb/options.h" #ifndef PLATFORM_IS_LITTLE_ENDIAN #define PLATFORM_IS_LITTLE_ENDIAN (__BYTE_ORDER == __LITTLE_ENDIAN) @@ -149,341 +132,6 @@ typedef pthread_once_t OnceType; #define LEVELDB_ONCE_INIT PTHREAD_ONCE_INIT extern void InitOnce(OnceType* once, void (*initializer)()); -inline bool Snappy_Compress(const CompressionOptions& opts, const char* input, - size_t length, ::std::string* output) { -#ifdef SNAPPY - output->resize(snappy::MaxCompressedLength(length)); - size_t outlen; - snappy::RawCompress(input, length, &(*output)[0], &outlen); - output->resize(outlen); - return true; -#endif - - return false; -} - -inline bool Snappy_GetUncompressedLength(const char* input, size_t length, - size_t* result) { -#ifdef SNAPPY - return snappy::GetUncompressedLength(input, length, result); -#else - return false; -#endif -} - -inline bool Snappy_Uncompress(const char* input, size_t length, - char* output) { -#ifdef SNAPPY - return snappy::RawUncompress(input, length, output); -#else - return false; -#endif -} - -inline bool Zlib_Compress(const CompressionOptions& opts, const char* input, - size_t length, ::std::string* output) { -#ifdef ZLIB - // The memLevel parameter specifies how much memory should be allocated for - // the internal compression state. - // memLevel=1 uses minimum memory but is slow and reduces compression ratio. - // memLevel=9 uses maximum memory for optimal speed. - // The default value is 8. See zconf.h for more details. - static const int memLevel = 8; - z_stream _stream; - memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits, - memLevel, opts.strategy); - if (st != Z_OK) { - return false; - } - - // Resize output to be the plain data length. - // This may not be big enough if the compression actually expands data. - output->resize(length); - - // Compress the input, and put compressed data in output. - _stream.next_in = (Bytef *)input; - _stream.avail_in = static_cast(length); - - // Initialize the output size. - _stream.avail_out = static_cast(length); - _stream.next_out = (Bytef*)&(*output)[0]; - - size_t old_sz = 0, new_sz = 0, new_sz_delta = 0; - bool done = false; - while (!done) { - st = deflate(&_stream, Z_FINISH); - switch (st) { - case Z_STREAM_END: - done = true; - break; - case Z_OK: - // No output space. Increase the output space by 20%. - // (Should we fail the compression since it expands the size?) - old_sz = output->size(); - new_sz_delta = static_cast(output->size() * 0.2); - new_sz = output->size() + (new_sz_delta < 10 ? 10 : new_sz_delta); - output->resize(new_sz); - // Set more output. - _stream.next_out = (Bytef *)&(*output)[old_sz]; - _stream.avail_out = static_cast(new_sz - old_sz); - break; - case Z_BUF_ERROR: - default: - deflateEnd(&_stream); - return false; - } - } - - output->resize(output->size() - _stream.avail_out); - deflateEnd(&_stream); - return true; -#endif - return false; -} - -inline char* Zlib_Uncompress(const char* input_data, size_t input_length, - int* decompress_size, int windowBits = -14) { -#ifdef ZLIB - z_stream _stream; - memset(&_stream, 0, sizeof(z_stream)); - - // For raw inflate, the windowBits should be -8..-15. - // If windowBits is bigger than zero, it will use either zlib - // header or gzip header. Adding 32 to it will do automatic detection. - int st = inflateInit2(&_stream, - windowBits > 0 ? windowBits + 32 : windowBits); - if (st != Z_OK) { - return nullptr; - } - - _stream.next_in = (Bytef *)input_data; - _stream.avail_in = static_cast(input_length); - - // Assume the decompressed data size will 5x of compressed size. - size_t output_len = input_length * 5; - char* output = new char[output_len]; - size_t old_sz = output_len; - - _stream.next_out = (Bytef *)output; - _stream.avail_out = static_cast(output_len); - - char* tmp = nullptr; - size_t output_len_delta; - bool done = false; - - //while(_stream.next_in != nullptr && _stream.avail_in != 0) { - while (!done) { - st = inflate(&_stream, Z_SYNC_FLUSH); - switch (st) { - case Z_STREAM_END: - done = true; - break; - case Z_OK: - // No output space. Increase the output space by 20%. - old_sz = output_len; - output_len_delta = static_cast(output_len * 0.2); - output_len += output_len_delta < 10 ? 10 : output_len_delta; - tmp = new char[output_len]; - memcpy(tmp, output, old_sz); - delete[] output; - output = tmp; - - // Set more output. - _stream.next_out = (Bytef *)(output + old_sz); - _stream.avail_out = static_cast(output_len - old_sz); - break; - case Z_BUF_ERROR: - default: - delete[] output; - inflateEnd(&_stream); - return nullptr; - } - } - - *decompress_size = static_cast(output_len - _stream.avail_out); - inflateEnd(&_stream); - return output; -#endif - - return nullptr; -} - -inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, - size_t length, ::std::string* output) { -#ifdef BZIP2 - bz_stream _stream; - memset(&_stream, 0, sizeof(bz_stream)); - - // Block size 1 is 100K. - // 0 is for silent. - // 30 is the default workFactor - int st = BZ2_bzCompressInit(&_stream, 1, 0, 30); - if (st != BZ_OK) { - return false; - } - - // Resize output to be the plain data length. - // This may not be big enough if the compression actually expands data. - output->resize(length); - - // Compress the input, and put compressed data in output. - _stream.next_in = (char *)input; - _stream.avail_in = static_cast(length); - - // Initialize the output size. - _stream.next_out = (char *)&(*output)[0]; - _stream.avail_out = static_cast(length); - - size_t old_sz = 0, new_sz = 0; - while (_stream.next_in != nullptr && _stream.avail_in != 0) { - st = BZ2_bzCompress(&_stream, BZ_FINISH); - switch (st) { - case BZ_STREAM_END: - break; - case BZ_FINISH_OK: - // No output space. Increase the output space by 20%. - // (Should we fail the compression since it expands the size?) - old_sz = output->size(); - new_sz = static_cast(output->size() * 1.2); - output->resize(new_sz); - // Set more output. - _stream.next_out = (char *)&(*output)[old_sz]; - _stream.avail_out = static_cast(new_sz - old_sz); - break; - case BZ_SEQUENCE_ERROR: - default: - BZ2_bzCompressEnd(&_stream); - return false; - } - } - - output->resize(output->size() - _stream.avail_out); - BZ2_bzCompressEnd(&_stream); - return true; -#endif - return false; -} - -inline char* BZip2_Uncompress(const char* input_data, size_t input_length, - int* decompress_size) { -#ifdef BZIP2 - bz_stream _stream; - memset(&_stream, 0, sizeof(bz_stream)); - - int st = BZ2_bzDecompressInit(&_stream, 0, 0); - if (st != BZ_OK) { - return nullptr; - } - - _stream.next_in = (char *)input_data; - _stream.avail_in = static_cast(input_length); - - // Assume the decompressed data size will be 5x of compressed size. - size_t output_len = input_length * 5; - char* output = new char[output_len]; - size_t old_sz = output_len; - - _stream.next_out = (char *)output; - _stream.avail_out = static_cast(output_len); - - char* tmp = nullptr; - - while(_stream.next_in != nullptr && _stream.avail_in != 0) { - st = BZ2_bzDecompress(&_stream); - switch (st) { - case BZ_STREAM_END: - break; - case BZ_OK: - // No output space. Increase the output space by 20%. - old_sz = output_len; - output_len = static_cast(output_len * 1.2); - tmp = new char[output_len]; - memcpy(tmp, output, old_sz); - delete[] output; - output = tmp; - - // Set more output. - _stream.next_out = (char *)(output + old_sz); - _stream.avail_out = static_cast(output_len - old_sz); - break; - default: - delete[] output; - BZ2_bzDecompressEnd(&_stream); - return nullptr; - } - } - - *decompress_size = static_cast(output_len - _stream.avail_out); - BZ2_bzDecompressEnd(&_stream); - return output; -#endif - return nullptr; -} - -inline bool LZ4_Compress(const CompressionOptions &opts, const char *input, - size_t length, ::std::string* output) { -#ifdef LZ4 - int compressBound = LZ4_compressBound(static_cast(length)); - output->resize(static_cast(8 + compressBound)); - char* p = const_cast(output->c_str()); - memcpy(p, &length, sizeof(length)); - int outlen = LZ4_compress_limitedOutput( - input, p + 8, static_cast(length), compressBound); - if (outlen == 0) { - return false; - } - output->resize(static_cast(8 + outlen)); - return true; -#endif - return false; -} - -inline char* LZ4_Uncompress(const char* input_data, size_t input_length, - int* decompress_size) { -#ifdef LZ4 - if (input_length < 8) { - return nullptr; - } - int output_len; - memcpy(&output_len, input_data, sizeof(output_len)); - char *output = new char[output_len]; - *decompress_size = LZ4_decompress_safe_partial( - input_data + 8, output, static_cast(input_length - 8), output_len, - output_len); - if (*decompress_size < 0) { - delete[] output; - return nullptr; - } - return output; -#endif - return nullptr; -} - -inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input, - size_t length, ::std::string* output) { -#ifdef LZ4 - int compressBound = LZ4_compressBound(static_cast(length)); - output->resize(static_cast(8 + compressBound)); - char* p = const_cast(output->c_str()); - memcpy(p, &length, sizeof(length)); - int outlen; -#ifdef LZ4_VERSION_MAJOR // they only started defining this since r113 - outlen = LZ4_compressHC2_limitedOutput(input, p + 8, static_cast(length), - compressBound, opts.level); -#else - outlen = LZ4_compressHC_limitedOutput(input, p + 8, static_cast(length), - compressBound); -#endif - if (outlen == 0) { - return false; - } - output->resize(static_cast(8 + outlen)); - return true; -#endif - return false; -} - #define CACHE_LINE_SIZE 64U #define PREFETCH(addr, rw, locality) __builtin_prefetch(addr, rw, locality) diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 0a93e309d..cdae8508b 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -39,6 +39,7 @@ #include "table/table_builder.h" #include "util/coding.h" +#include "util/compression.h" #include "util/crc32c.h" #include "util/stop_watch.h" #include "util/xxhash.h" @@ -312,36 +313,36 @@ Slice CompressBlock(const Slice& raw, // supported in this platform and (2) the compression rate is "good enough". switch (*type) { case kSnappyCompression: - if (port::Snappy_Compress(compression_options, raw.data(), raw.size(), - compressed_output) && + if (Snappy_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kZlibCompression: - if (port::Zlib_Compress(compression_options, raw.data(), raw.size(), - compressed_output) && + if (Zlib_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kBZip2Compression: - if (port::BZip2_Compress(compression_options, raw.data(), raw.size(), - compressed_output) && + if (BZip2_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kLZ4Compression: - if (port::LZ4_Compress(compression_options, raw.data(), raw.size(), - compressed_output) && + if (LZ4_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kLZ4HCCompression: - if (port::LZ4HC_Compress(compression_options, raw.data(), raw.size(), - compressed_output) && + if (LZ4HC_Compress(compression_options, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } diff --git a/table/format.cc b/table/format.cc index 227090bb2..c7f96f427 100644 --- a/table/format.cc +++ b/table/format.cc @@ -12,10 +12,10 @@ #include #include -#include "port/port.h" #include "rocksdb/env.h" #include "table/block.h" #include "util/coding.h" +#include "util/compression.h" #include "util/crc32c.h" #include "util/perf_context_imp.h" #include "util/xxhash.h" @@ -367,19 +367,19 @@ Status UncompressBlockContents(const char* data, size_t n, size_t ulength = 0; static char snappy_corrupt_msg[] = "Snappy not supported or corrupted Snappy compressed block contents"; - if (!port::Snappy_GetUncompressedLength(data, n, &ulength)) { + if (!Snappy_GetUncompressedLength(data, n, &ulength)) { return Status::Corruption(snappy_corrupt_msg); } ubuf = std::unique_ptr(new char[ulength]); - if (!port::Snappy_Uncompress(data, n, ubuf.get())) { + if (!Snappy_Uncompress(data, n, ubuf.get())) { return Status::Corruption(snappy_corrupt_msg); } *contents = BlockContents(std::move(ubuf), ulength, true, kNoCompression); break; } case kZlibCompression: - ubuf = std::unique_ptr( - port::Zlib_Uncompress(data, n, &decompress_size)); + ubuf = + std::unique_ptr(Zlib_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -389,8 +389,8 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kBZip2Compression: - ubuf = std::unique_ptr( - port::BZip2_Uncompress(data, n, &decompress_size)); + ubuf = + std::unique_ptr(BZip2_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char bzip2_corrupt_msg[] = "Bzip2 not supported or corrupted Bzip2 compressed block contents"; @@ -400,8 +400,7 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf = std::unique_ptr( - port::LZ4_Uncompress(data, n, &decompress_size)); + ubuf = std::unique_ptr(LZ4_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -411,8 +410,7 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf = std::unique_ptr( - port::LZ4_Uncompress(data, n, &decompress_size)); + ubuf = std::unique_ptr(LZ4_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; diff --git a/util/compression.h b/util/compression.h new file mode 100644 index 000000000..a0ca91f7f --- /dev/null +++ b/util/compression.h @@ -0,0 +1,367 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +#pragma once + +#include "rocksdb/options.h" + +#ifdef SNAPPY +#include +#endif + +#ifdef ZLIB +#include +#endif + +#ifdef BZIP2 +#include +#endif + +#if defined(LZ4) +#include +#include +#endif + +namespace rocksdb { + +inline bool Snappy_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { +#ifdef SNAPPY + output->resize(snappy::MaxCompressedLength(length)); + size_t outlen; + snappy::RawCompress(input, length, &(*output)[0], &outlen); + output->resize(outlen); + return true; +#endif + + return false; +} + +inline bool Snappy_GetUncompressedLength(const char* input, size_t length, + size_t* result) { +#ifdef SNAPPY + return snappy::GetUncompressedLength(input, length, result); +#else + return false; +#endif +} + +inline bool Snappy_Uncompress(const char* input, size_t length, + char* output) { +#ifdef SNAPPY + return snappy::RawUncompress(input, length, output); +#else + return false; +#endif +} + +inline bool Zlib_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { +#ifdef ZLIB + // The memLevel parameter specifies how much memory should be allocated for + // the internal compression state. + // memLevel=1 uses minimum memory but is slow and reduces compression ratio. + // memLevel=9 uses maximum memory for optimal speed. + // The default value is 8. See zconf.h for more details. + static const int memLevel = 8; + z_stream _stream; + memset(&_stream, 0, sizeof(z_stream)); + int st = deflateInit2(&_stream, opts.level, Z_DEFLATED, opts.window_bits, + memLevel, opts.strategy); + if (st != Z_OK) { + return false; + } + + // Resize output to be the plain data length. + // This may not be big enough if the compression actually expands data. + output->resize(length); + + // Compress the input, and put compressed data in output. + _stream.next_in = (Bytef *)input; + _stream.avail_in = static_cast(length); + + // Initialize the output size. + _stream.avail_out = static_cast(length); + _stream.next_out = (Bytef*)&(*output)[0]; + + size_t old_sz = 0, new_sz = 0, new_sz_delta = 0; + bool done = false; + while (!done) { + st = deflate(&_stream, Z_FINISH); + switch (st) { + case Z_STREAM_END: + done = true; + break; + case Z_OK: + // No output space. Increase the output space by 20%. + // (Should we fail the compression since it expands the size?) + old_sz = output->size(); + new_sz_delta = static_cast(output->size() * 0.2); + new_sz = output->size() + (new_sz_delta < 10 ? 10 : new_sz_delta); + output->resize(new_sz); + // Set more output. + _stream.next_out = (Bytef *)&(*output)[old_sz]; + _stream.avail_out = static_cast(new_sz - old_sz); + break; + case Z_BUF_ERROR: + default: + deflateEnd(&_stream); + return false; + } + } + + output->resize(output->size() - _stream.avail_out); + deflateEnd(&_stream); + return true; +#endif + return false; +} + +inline char* Zlib_Uncompress(const char* input_data, size_t input_length, + int* decompress_size, int windowBits = -14) { +#ifdef ZLIB + z_stream _stream; + memset(&_stream, 0, sizeof(z_stream)); + + // For raw inflate, the windowBits should be -8..-15. + // If windowBits is bigger than zero, it will use either zlib + // header or gzip header. Adding 32 to it will do automatic detection. + int st = inflateInit2(&_stream, + windowBits > 0 ? windowBits + 32 : windowBits); + if (st != Z_OK) { + return nullptr; + } + + _stream.next_in = (Bytef *)input_data; + _stream.avail_in = static_cast(input_length); + + // Assume the decompressed data size will 5x of compressed size. + size_t output_len = input_length * 5; + char* output = new char[output_len]; + size_t old_sz = output_len; + + _stream.next_out = (Bytef *)output; + _stream.avail_out = static_cast(output_len); + + char* tmp = nullptr; + size_t output_len_delta; + bool done = false; + + //while(_stream.next_in != nullptr && _stream.avail_in != 0) { + while (!done) { + st = inflate(&_stream, Z_SYNC_FLUSH); + switch (st) { + case Z_STREAM_END: + done = true; + break; + case Z_OK: + // No output space. Increase the output space by 20%. + old_sz = output_len; + output_len_delta = static_cast(output_len * 0.2); + output_len += output_len_delta < 10 ? 10 : output_len_delta; + tmp = new char[output_len]; + memcpy(tmp, output, old_sz); + delete[] output; + output = tmp; + + // Set more output. + _stream.next_out = (Bytef *)(output + old_sz); + _stream.avail_out = static_cast(output_len - old_sz); + break; + case Z_BUF_ERROR: + default: + delete[] output; + inflateEnd(&_stream); + return nullptr; + } + } + + *decompress_size = static_cast(output_len - _stream.avail_out); + inflateEnd(&_stream); + return output; +#endif + + return nullptr; +} + +inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, + size_t length, ::std::string* output) { +#ifdef BZIP2 + bz_stream _stream; + memset(&_stream, 0, sizeof(bz_stream)); + + // Block size 1 is 100K. + // 0 is for silent. + // 30 is the default workFactor + int st = BZ2_bzCompressInit(&_stream, 1, 0, 30); + if (st != BZ_OK) { + return false; + } + + // Resize output to be the plain data length. + // This may not be big enough if the compression actually expands data. + output->resize(length); + + // Compress the input, and put compressed data in output. + _stream.next_in = (char *)input; + _stream.avail_in = static_cast(length); + + // Initialize the output size. + _stream.next_out = (char *)&(*output)[0]; + _stream.avail_out = static_cast(length); + + size_t old_sz = 0, new_sz = 0; + while (_stream.next_in != nullptr && _stream.avail_in != 0) { + st = BZ2_bzCompress(&_stream, BZ_FINISH); + switch (st) { + case BZ_STREAM_END: + break; + case BZ_FINISH_OK: + // No output space. Increase the output space by 20%. + // (Should we fail the compression since it expands the size?) + old_sz = output->size(); + new_sz = static_cast(output->size() * 1.2); + output->resize(new_sz); + // Set more output. + _stream.next_out = (char *)&(*output)[old_sz]; + _stream.avail_out = static_cast(new_sz - old_sz); + break; + case BZ_SEQUENCE_ERROR: + default: + BZ2_bzCompressEnd(&_stream); + return false; + } + } + + output->resize(output->size() - _stream.avail_out); + BZ2_bzCompressEnd(&_stream); + return true; +#endif + return false; +} + +inline char* BZip2_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { +#ifdef BZIP2 + bz_stream _stream; + memset(&_stream, 0, sizeof(bz_stream)); + + int st = BZ2_bzDecompressInit(&_stream, 0, 0); + if (st != BZ_OK) { + return nullptr; + } + + _stream.next_in = (char *)input_data; + _stream.avail_in = static_cast(input_length); + + // Assume the decompressed data size will be 5x of compressed size. + size_t output_len = input_length * 5; + char* output = new char[output_len]; + size_t old_sz = output_len; + + _stream.next_out = (char *)output; + _stream.avail_out = static_cast(output_len); + + char* tmp = nullptr; + + while(_stream.next_in != nullptr && _stream.avail_in != 0) { + st = BZ2_bzDecompress(&_stream); + switch (st) { + case BZ_STREAM_END: + break; + case BZ_OK: + // No output space. Increase the output space by 20%. + old_sz = output_len; + output_len = static_cast(output_len * 1.2); + tmp = new char[output_len]; + memcpy(tmp, output, old_sz); + delete[] output; + output = tmp; + + // Set more output. + _stream.next_out = (char *)(output + old_sz); + _stream.avail_out = static_cast(output_len - old_sz); + break; + default: + delete[] output; + BZ2_bzDecompressEnd(&_stream); + return nullptr; + } + } + + *decompress_size = static_cast(output_len - _stream.avail_out); + BZ2_bzDecompressEnd(&_stream); + return output; +#endif + return nullptr; +} + +inline bool LZ4_Compress(const CompressionOptions &opts, const char *input, + size_t length, ::std::string* output) { +#ifdef LZ4 + int compressBound = LZ4_compressBound(static_cast(length)); + output->resize(static_cast(8 + compressBound)); + char* p = const_cast(output->c_str()); + memcpy(p, &length, sizeof(length)); + int outlen = LZ4_compress_limitedOutput( + input, p + 8, static_cast(length), compressBound); + if (outlen == 0) { + return false; + } + output->resize(static_cast(8 + outlen)); + return true; +#endif + return false; +} + +inline char* LZ4_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { +#ifdef LZ4 + if (input_length < 8) { + return nullptr; + } + int output_len; + memcpy(&output_len, input_data, sizeof(output_len)); + char *output = new char[output_len]; + *decompress_size = LZ4_decompress_safe_partial( + input_data + 8, output, static_cast(input_length - 8), output_len, + output_len); + if (*decompress_size < 0) { + delete[] output; + return nullptr; + } + return output; +#endif + return nullptr; +} + +inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input, + size_t length, ::std::string* output) { +#ifdef LZ4 + int compressBound = LZ4_compressBound(static_cast(length)); + output->resize(static_cast(8 + compressBound)); + char* p = const_cast(output->c_str()); + memcpy(p, &length, sizeof(length)); + int outlen; +#ifdef LZ4_VERSION_MAJOR // they only started defining this since r113 + outlen = LZ4_compressHC2_limitedOutput(input, p + 8, static_cast(length), + compressBound, opts.level); +#else + outlen = LZ4_compressHC_limitedOutput(input, p + 8, static_cast(length), + compressBound); +#endif + if (outlen == 0) { + return false; + } + output->resize(static_cast(8 + outlen)); + return true; +#endif + return false; +} +} // namespace rocksdb diff --git a/util/thread_status_updater.cc b/util/thread_status_updater.cc index 119174db5..feb129885 100644 --- a/util/thread_status_updater.cc +++ b/util/thread_status_updater.cc @@ -3,6 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include "port/likely.h" #include "util/mutexlock.h" #include "util/thread_status_updater.h" diff --git a/util/thread_status_updater.h b/util/thread_status_updater.h index 8cb80022f..c97102a96 100644 --- a/util/thread_status_updater.h +++ b/util/thread_status_updater.h @@ -27,13 +27,15 @@ // This means user might not always get full information, but whenever // returned by the GetThreadList() is guaranteed to be consistent. #pragma once -#include #include +#include +#include +#include #include #include -#include -#include +#include #include + #include "rocksdb/status.h" #include "rocksdb/thread_status.h" #include "port/port_posix.h" diff --git a/utilities/document/json_document.cc b/utilities/document/json_document.cc index e5b745573..254574113 100644 --- a/utilities/document/json_document.cc +++ b/utilities/document/json_document.cc @@ -11,9 +11,11 @@ #endif #include + #include -#include #include +#include +#include #include #include "third-party/rapidjson/reader.h"