From df2f92214ac834ac3c0790516cd05837173f48fe Mon Sep 17 00:00:00 2001 From: Albert Strasheim Date: Fri, 7 Feb 2014 18:12:30 -0800 Subject: [PATCH] Support for LZ4 compression. --- build_tools/build_detect_platform | 14 ++- db/db_bench.cc | 138 ++++++++++++++++++++++++----- db/db_test.cc | 22 ++++- include/rocksdb/c.h | 6 +- include/rocksdb/options.h | 6 +- port/port_posix.h | 68 +++++++++++++- table/block_based_table_builder.cc | 24 +++++ table/format.cc | 22 +++++ table/table_test.cc | 72 ++++++++++++--- tools/db_stress.cc | 11 ++- util/ldb_cmd.cc | 4 + 11 files changed, 345 insertions(+), 42 deletions(-) diff --git a/build_tools/build_detect_platform b/build_tools/build_detect_platform index 8e83ae497..74fa72182 100755 --- a/build_tools/build_detect_platform +++ b/build_tools/build_detect_platform @@ -19,7 +19,8 @@ # # -DLEVELDB_PLATFORM_POSIX if cstdatomic is present # -DLEVELDB_PLATFORM_NOATOMIC if it is not -# -DSNAPPY if the Snappy library is present +# -DSNAPPY if the Snappy library is present +# -DLZ4 if the LZ4 library is present # # Using gflags in rocksdb: # Our project depends on gflags, which requires users to take some extra steps @@ -244,6 +245,17 @@ EOF PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -lbz2" fi + # Test whether lz4 library is installed + $CXX $CFLAGS $COMMON_FLAGS -x c++ - -o /dev/null 2>/dev/null < + #include + int main() {} +EOF + if [ "$?" = 0 ]; then + COMMON_FLAGS="$COMMON_FLAGS -DLZ4" + PLATFORM_LDFLAGS="$PLATFORM_LDFLAGS -llz4" + fi + # Test whether tcmalloc is available $CXX $CFLAGS -x c++ - -o /dev/null -ltcmalloc 2>/dev/null <stats.FinishedSingleOp(nullptr); } if (!ok) { - thread->stats.AddMessage("(snappy failure)"); + thread->stats.AddMessage("(compression failure)"); } else { char buf[100]; snprintf(buf, sizeof(buf), "(output: %.1f%%)", @@ -1328,24 +1372,78 @@ class Benchmark { } } - void SnappyUncompress(ThreadState* thread) { + void Uncompress(ThreadState *thread) { RandomGenerator gen; Slice input = gen.Generate(Options().block_size); std::string compressed; - bool ok = port::Snappy_Compress(Options().compression_opts, input.data(), - input.size(), &compressed); + + bool ok; + switch (FLAGS_compression_type_e) { + case rocksdb::kSnappyCompression: + ok = port::Snappy_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kZlibCompression: + ok = port::Zlib_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kBZip2Compression: + ok = port::BZip2_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kLZ4Compression: + ok = port::LZ4_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + case rocksdb::kLZ4HCCompression: + ok = port::LZ4HC_Compress(Options().compression_opts, input.data(), + input.size(), &compressed); + break; + default: + ok = false; + } + int64_t bytes = 0; - char* uncompressed = new char[input.size()]; - while (ok && bytes < 1024 * 1048576) { // Compress 1G - ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), - uncompressed); + int decompress_size; + while (ok && bytes < 1024 * 1048576) { + char *uncompressed = nullptr; + switch (FLAGS_compression_type_e) { + case rocksdb::kSnappyCompression: + // allocate here to make comparison fair + uncompressed = new char[input.size()]; + ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), + uncompressed); + break; + case rocksdb::kZlibCompression: + uncompressed = port::Zlib_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + case rocksdb::kBZip2Compression: + uncompressed = port::BZip2_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + case rocksdb::kLZ4Compression: + uncompressed = port::LZ4_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + case rocksdb::kLZ4HCCompression: + uncompressed = port::LZ4_Uncompress( + compressed.data(), compressed.size(), &decompress_size); + ok = uncompressed != nullptr; + break; + default: + ok = false; + } + delete[] uncompressed; bytes += input.size(); thread->stats.FinishedSingleOp(nullptr); } - delete[] uncompressed; if (!ok) { - thread->stats.AddMessage("(snappy failure)"); + thread->stats.AddMessage("(compression failure)"); } else { thread->stats.AddBytes(bytes); } diff --git a/db/db_test.cc b/db/db_test.cc index 23fb63aad..1583e9416 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -56,7 +56,19 @@ static bool BZip2CompressionSupported(const CompressionOptions& options) { return port::BZip2_Compress(options, in.data(), in.size(), &out); } -static std::string RandomString(Random* rnd, int len) { +static bool LZ4CompressionSupported(const CompressionOptions &options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4_Compress(options, in.data(), in.size(), &out); +} + +static bool LZ4HCCompressionSupported(const CompressionOptions &options) { + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4HC_Compress(options, in.data(), in.size(), &out); +} + +static std::string RandomString(Random *rnd, int len) { std::string r; test::RandomString(rnd, len, &r); return r; @@ -2624,6 +2636,14 @@ bool MinLevelToCompress(CompressionType& type, Options& options, int wbits, CompressionOptions(wbits, lev, strategy))) { type = kBZip2Compression; fprintf(stderr, "using bzip2\n"); + } else if (LZ4CompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kLZ4Compression; + fprintf(stderr, "using lz4\n"); + } else if (LZ4HCCompressionSupported( + CompressionOptions(wbits, lev, strategy))) { + type = kLZ4HCCompression; + fprintf(stderr, "using lz4hc\n"); } else { fprintf(stderr, "skipping test, compression disabled\n"); return false; diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index bd22e191b..405b292da 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -238,8 +238,10 @@ extern void rocksdb_options_set_memtable_vector_rep(rocksdb_options_t*); enum { rocksdb_no_compression = 0, rocksdb_snappy_compression = 1, - rocksdb_zlib_compression = 1, - rocksdb_bz2_compression = 1 + rocksdb_zlib_compression = 2, + rocksdb_bz2_compression = 3, + rocksdb_lz4_compression = 4, + rocksdb_lz4hc_compression = 5 }; extern void rocksdb_options_set_compression(rocksdb_options_t*, int); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 1c55232d0..5b11a2c79 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -45,10 +45,8 @@ using std::shared_ptr; enum CompressionType : char { // NOTE: do not change the values of existing entries, as these are // part of the persistent format on disk. - kNoCompression = 0x0, - kSnappyCompression = 0x1, - kZlibCompression = 0x2, - kBZip2Compression = 0x3 + kNoCompression = 0x0, kSnappyCompression = 0x1, kZlibCompression = 0x2, + kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5 }; enum CompactionStyle : char { diff --git a/port/port_posix.h b/port/port_posix.h index 839e89afe..aaea0b574 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -46,6 +46,11 @@ #include #endif +#if defined(LZ4) +#include +#include +#endif + #include #include #include @@ -353,8 +358,8 @@ inline bool BZip2_Compress(const CompressionOptions& opts, const char* input, return false; } -inline char* BZip2_Uncompress(const char* input_data, size_t input_length, - int* decompress_size) { +inline char* BZip2_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { #ifdef BZIP2 bz_stream _stream; memset(&_stream, 0, sizeof(bz_stream)); @@ -409,7 +414,64 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, return nullptr; } -inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { +inline bool LZ4_Compress(const CompressionOptions &opts, const char *input, + size_t length, ::std::string* output) { +#ifdef LZ4 + int compressBound = LZ4_compressBound(length); + output->resize(8 + compressBound); + char *p = const_cast(output->c_str()); + memcpy(p, &length, sizeof(length)); + size_t outlen; + outlen = LZ4_compress_limitedOutput(input, p + 8, length, compressBound); + if (outlen == 0) { + return false; + } + output->resize(8 + outlen); + return true; +#endif + return false; +} + +inline char* LZ4_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { +#ifdef LZ4 + if (input_length < 8) { + return nullptr; + } + int output_len; + memcpy(&output_len, input_data, sizeof(output_len)); + char *output = new char[output_len]; + *decompress_size = LZ4_decompress_safe_partial( + input_data + 8, output, input_length - 8, output_len, output_len); + if (*decompress_size < 0) { + delete[] output; + return nullptr; + } + return output; +#endif + return nullptr; +} + +inline bool LZ4HC_Compress(const CompressionOptions &opts, const char* input, + size_t length, ::std::string* output) { +#ifdef LZ4 + int compressBound = LZ4_compressBound(length); + output->resize(8 + compressBound); + char *p = const_cast(output->c_str()); + memcpy(p, &length, sizeof(length)); + size_t outlen; + outlen = LZ4_compressHC2_limitedOutput(input, p + 8, length, compressBound, + opts.level); + if (outlen == 0) { + return false; + } + output->resize(8 + outlen); + return true; +#endif + return false; +} + +inline bool GetHeapProfile(void (*func)(void *, const char *, int), void *arg) { return false; } diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index e5f3bd4d2..75f204670 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -233,6 +233,30 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, type = kNoCompression; } break; + case kLZ4Compression: + if (port::LZ4_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && + GoodCompressionRatio(compressed->size(), raw.size())) { + block_contents = *compressed; + } else { + // LZ4 not supported, or not good compression ratio, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; + case kLZ4HCCompression: + if (port::LZ4HC_Compress(r->options.compression_opts, raw.data(), + raw.size(), compressed) && + GoodCompressionRatio(compressed->size(), raw.size())) { + block_contents = *compressed; + } else { + // LZ4 not supported, or not good compression ratio, so just + // store uncompressed form + block_contents = raw; + type = kNoCompression; + } + break; } WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); diff --git a/table/format.cc b/table/format.cc index 561d1689a..698c2953e 100644 --- a/table/format.cc +++ b/table/format.cc @@ -228,6 +228,28 @@ Status UncompressBlockContents(const char* data, size_t n, result->heap_allocated = true; result->cachable = true; break; + case kLZ4Compression: + ubuf = port::LZ4_Uncompress(data, n, &decompress_size); + static char lz4_corrupt_msg[] = + "LZ4 not supported or corrupted LZ4 compressed block contents"; + if (!ubuf) { + return Status::Corruption(lz4_corrupt_msg); + } + result->data = Slice(ubuf, decompress_size); + result->heap_allocated = true; + result->cachable = true; + break; + case kLZ4HCCompression: + ubuf = port::LZ4_Uncompress(data, n, &decompress_size); + static char lz4hc_corrupt_msg[] = + "LZ4HC not supported or corrupted LZ4HC compressed block contents"; + if (!ubuf) { + return Status::Corruption(lz4hc_corrupt_msg); + } + result->data = Slice(ubuf, decompress_size); + result->heap_allocated = true; + result->cachable = true; + break; default: return Status::Corruption("bad block type"); } diff --git a/table/table_test.cc b/table/table_test.cc index 34a1932a8..408a5ad50 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -487,30 +487,62 @@ class DBConstructor: public Constructor { }; static bool SnappyCompressionSupported() { +#ifdef SNAPPY std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; return port::Snappy_Compress(Options().compression_opts, in.data(), in.size(), &out); +#else + return false; +#endif } static bool ZlibCompressionSupported() { +#ifdef ZLIB std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; return port::Zlib_Compress(Options().compression_opts, in.data(), in.size(), &out); +#else + return false; +#endif } -#ifdef BZIP2 static bool BZip2CompressionSupported() { +#ifdef BZIP2 std::string out; Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; return port::BZip2_Compress(Options().compression_opts, in.data(), in.size(), &out); +#else + return false; +#endif +} + +static bool LZ4CompressionSupported() { +#ifdef LZ4 + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4_Compress(Options().compression_opts, in.data(), in.size(), + &out); +#else + return false; +#endif } + +static bool LZ4HCCompressionSupported() { +#ifdef LZ4 + std::string out; + Slice in = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; + return port::LZ4HC_Compress(Options().compression_opts, in.data(), in.size(), + &out); +#else + return false; #endif +} enum TestType { BLOCK_BASED_TABLE_TEST, @@ -538,24 +570,23 @@ static std::vector GenerateArgList() { std::vector restart_intervals = {16, 1, 1024}; // Only add compression if it is supported - std::vector compression_types = {kNoCompression}; -#ifdef SNAPPY + std::vector compression_types; + compression_types.push_back(kNoCompression); if (SnappyCompressionSupported()) { compression_types.push_back(kSnappyCompression); } -#endif - -#ifdef ZLIB if (ZlibCompressionSupported()) { compression_types.push_back(kZlibCompression); } -#endif - -#ifdef BZIP2 if (BZip2CompressionSupported()) { compression_types.push_back(kBZip2Compression); } -#endif + if (LZ4CompressionSupported()) { + compression_types.push_back(kLZ4Compression); + } + if (LZ4HCCompressionSupported()) { + compression_types.push_back(kLZ4HCCompression); + } for (auto test_type : test_types) { for (auto reverse_compare : reverse_compare_types) { @@ -1322,6 +1353,27 @@ TEST(GeneralTableTest, ApproximateOffsetOfCompressed) { valid++; } + if (!BZip2CompressionSupported()) { + fprintf(stderr, "skipping bzip2 compression tests\n"); + } else { + compression_state[valid] = kBZip2Compression; + valid++; + } + + if (!LZ4CompressionSupported()) { + fprintf(stderr, "skipping lz4 compression tests\n"); + } else { + compression_state[valid] = kLZ4Compression; + valid++; + } + + if (!LZ4HCCompressionSupported()) { + fprintf(stderr, "skipping lz4hc compression tests\n"); + } else { + compression_state[valid] = kLZ4HCCompression; + valid++; + } + for (int i = 0; i < valid; i++) { DoCompressionTest(compression_state[i]); } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index bad2cf0d6..9bb581a5b 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -273,6 +273,10 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { return rocksdb::kZlibCompression; else if (!strcasecmp(ctype, "bzip2")) return rocksdb::kBZip2Compression; + else if (!strcasecmp(ctype, "lz4")) + return rocksdb::kLZ4Compression; + else if (!strcasecmp(ctype, "lz4hc")) + return rocksdb::kLZ4HCCompression; fprintf(stdout, "Cannot parse compression type '%s'\n", ctype); return rocksdb::kSnappyCompression; //default value @@ -1328,7 +1332,12 @@ class StressTest { case rocksdb::kBZip2Compression: compression = "bzip2"; break; - } + case rocksdb::kLZ4Compression: + compression = "lz4"; + case rocksdb::kLZ4HCCompression: + compression = "lz4hc"; + break; + } fprintf(stdout, "Compression : %s\n", compression); diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 86635a695..f373ea18d 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -244,6 +244,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() { opt.compression = kZlibCompression; } else if (comp == "bzip2") { opt.compression = kBZip2Compression; + } else if (comp == "lz4") { + opt.compression = kLZ4Compression; + } else if (comp == "lz4hc") { + opt.compression = kLZ4HCCompression; } else { // Unknown compression. exec_state_ = LDBCommandExecuteResult::FAILED(