From ee221d2de0315f933282eb663597136d75d727c8 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Tue, 19 Apr 2016 22:54:24 -0700 Subject: [PATCH] Introduce XPRESS compresssion on Windows. (#1081) Comparable with Snappy on comp ratio. Implemented using Windows API, does not require external package. Avaiable since Windows 8 and server 2012. Use -DXPRESS=1 with CMake to enable. --- CMakeLists.txt | 5 +- db/compaction_job_stats_test.cc | 3 + db/db_test.cc | 40 +++-- db/db_wal_test.cc | 5 +- include/rocksdb/options.h | 1 + port/win/port_win.h | 17 -- port/win/xpress_win.cc | 270 +++++++++++++++++++++++++++++ port/win/xpress_win.h | 26 +++ port/xpress.h | 17 ++ table/block_based_table_builder.cc | 7 + table/format.cc | 23 ++- table/format.h | 1 + table/table_test.cc | 11 ++ thirdparty.inc | 18 ++ tools/db_bench_tool.cc | 11 ++ tools/db_stress.cc | 2 + tools/ldb_cmd.cc | 2 + tools/sst_dump_tool.cc | 36 ++-- util/compression.h | 31 ++++ util/options_helper.h | 1 + util/options_test.cc | 10 +- 21 files changed, 467 insertions(+), 70 deletions(-) create mode 100644 port/win/xpress_win.cc create mode 100644 port/win/xpress_win.h create mode 100644 port/xpress.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 0de146867..f13452b3e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -82,7 +82,7 @@ set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /Oxt /Zp8 /Gm- /Gy /MD") set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG") -add_definitions(-DWIN32 -DOS_WIN -D_MBCS -DWIN64) +add_definitions(-DWIN32 -DOS_WIN -D_MBCS -DWIN64 -DNOMINMAX) include_directories(${PROJECT_SOURCE_DIR}) include_directories(${PROJECT_SOURCE_DIR}/include) @@ -90,7 +90,7 @@ include_directories(${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src) set(ROCKSDB_LIBS rocksdblib${ARTIFACT_SUFFIX}) set(THIRDPARTY_LIBS ${THIRDPARTY_LIBS} gtest) -set(SYSTEM_LIBS Shlwapi.lib Rpcrt4.lib) +set(SYSTEM_LIBS ${SYSTEM_LIBS} Shlwapi.lib Rpcrt4.lib) set(LIBS ${ROCKSDB_LIBS} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) @@ -156,6 +156,7 @@ set(SOURCES port/win/env_win.cc port/win/port_win.cc port/win/win_logger.cc + port/win/xpress_win.cc table/adaptive_table_factory.cc table/block.cc table/block_based_filter_block.cc diff --git a/db/compaction_job_stats_test.cc b/db/compaction_job_stats_test.cc index 2cd317404..a05b0ba64 100644 --- a/db/compaction_job_stats_test.cc +++ b/db/compaction_job_stats_test.cc @@ -624,7 +624,10 @@ CompressionType GetAnyCompression() { return kBZip2Compression; } else if (LZ4_Supported()) { return kLZ4Compression; + } else if (XPRESS_Supported()) { + return kXpressCompression; } + return kNoCompression; } diff --git a/db/db_test.cc b/db/db_test.cc index caa230b6a..310142de1 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3184,8 +3184,8 @@ class ModelDB : public DB { virtual Status GetUpdatesSince( rocksdb::SequenceNumber, unique_ptr*, - const TransactionLogIterator::ReadOptions& read_options = - TransactionLogIterator::ReadOptions()) override { + const TransactionLogIterator::ReadOptions& + read_options = TransactionLogIterator::ReadOptions()) override { return Status::NotSupported("Not supported in Model DB"); } @@ -3271,7 +3271,8 @@ static bool CompareIterators(int step, DB* model, DB* db, ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) { count++; if (miter->key().compare(dbiter->key()) != 0) { - fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", step, + fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", + step, EscapeString(miter->key()).c_str(), EscapeString(dbiter->key()).c_str()); ok = false; @@ -3474,6 +3475,7 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) { options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.prefix_extractor.reset(NewFixedPrefixTransform(1)); + Reopen(options); ASSERT_OK(Put("k1", "v1")); Flush(); @@ -3672,15 +3674,15 @@ TEST_F(DBTest, TableOptionsSanitizeTest) { ASSERT_OK(TryReopen(options)); } + +// On Windows you can have either memory mapped file or a file +// with unbuffered access. So this asserts and does not make +// sense to run +#ifndef OS_WIN TEST_F(DBTest, MmapAndBufferOptions) { Options options = CurrentOptions(); - // If allow_mmap_reads is on allow_os_buffer must also be on - // On Windows you can have either memory mapped file or a file - // with unbuffered access. -#ifndef OS_WIN options.allow_os_buffer = false; -#endif options.allow_mmap_reads = true; ASSERT_NOK(TryReopen(options)); @@ -3695,6 +3697,7 @@ TEST_F(DBTest, MmapAndBufferOptions) { options.allow_os_buffer = true; ASSERT_OK(TryReopen(options)); } +#endif TEST_F(DBTest, ConcurrentMemtableNotSupported) { Options options = CurrentOptions(); @@ -4948,10 +4951,12 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) { // iter 1 -- bzip2 // iter 2 -- lz4 // iter 3 -- lz4HC + // iter 4 -- xpress CompressionType compressions[] = {kZlibCompression, kBZip2Compression, - kLZ4Compression, kLZ4HCCompression}; - for (int iter = 0; iter < 4; ++iter) { - if (!CompressionTypeSupported(compressions[iter])) { + kLZ4Compression, kLZ4HCCompression, + kXpressCompression}; + for (auto comp : compressions) { + if (!CompressionTypeSupported(comp)) { continue; } // first_table_version 1 -- generate with table_version == 1, read with @@ -4966,7 +4971,7 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) { Options options = CurrentOptions(); options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.create_if_missing = true; - options.compression = compressions[iter]; + options.compression = comp; DestroyAndReopen(options); int kNumKeysWritten = 100000; @@ -5809,18 +5814,19 @@ TEST_F(DBTest, LastWriteBufferDelay) { TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { CompressionType compressions[] = {kZlibCompression, kBZip2Compression, - kLZ4Compression, kLZ4HCCompression}; - for (int iter = 0; iter < 4; ++iter) { - if (!CompressionTypeSupported(compressions[iter])) { + kLZ4Compression, kLZ4HCCompression, + kXpressCompression}; + for (auto comp : compressions) { + if (!CompressionTypeSupported(comp)) { // not supported, we should fail the Open() Options options = CurrentOptions(); - options.compression = compressions[iter]; + options.compression = comp; ASSERT_TRUE(!TryReopen(options).ok()); // Try if CreateColumnFamily also fails options.compression = kNoCompression; ASSERT_OK(TryReopen(options)); ColumnFamilyOptions cf_options(options); - cf_options.compression = compressions[iter]; + cf_options.compression = comp; ColumnFamilyHandle* handle; ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok()); } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index bb944735d..d8831ef3d 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -584,8 +584,11 @@ class RecoveryTestHelper { int fd = open(filename.c_str(), O_RDWR); + // On windows long is 32-bit + ASSERT_LE(offset, std::numeric_limits::max()); + ASSERT_GT(fd, 0); - ASSERT_EQ(offset, lseek(fd, offset, SEEK_SET)); + ASSERT_EQ(offset, lseek(fd, static_cast(offset), SEEK_SET)); void* buf = alloca(len); memset(buf, 'a', len); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 955ab240d..10984b297 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -61,6 +61,7 @@ enum CompressionType : char { kBZip2Compression = 0x3, kLZ4Compression = 0x4, kLZ4HCCompression = 0x5, + kXpressCompression = 0x6, // zstd format is not finalized yet so it's subject to changes. kZSTDNotFinalCompression = 0x40, }; diff --git a/port/win/port_win.h b/port/win/port_win.h index 1c3e94ef1..116ebc4d9 100644 --- a/port/win/port_win.h +++ b/port/win/port_win.h @@ -56,23 +56,6 @@ typedef SSIZE_T ssize_t; #define __attribute__(A) -#ifdef ZLIB -#include -#endif - -#ifdef BZIP2 -#include -#endif - -#if defined(LZ4) -#include -#include -#endif - -#ifdef SNAPPY -#include -#endif - // Thread local storage on Linux // There is thread_local in C++11 #ifndef __thread diff --git a/port/win/xpress_win.cc b/port/win/xpress_win.cc new file mode 100644 index 000000000..e183078ea --- /dev/null +++ b/port/win/xpress_win.cc @@ -0,0 +1,270 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port/win/xpress_win.h" +#include + +#include +#include +#include +#include + +#ifdef XPRESS + +#ifdef JEMALLOC +#include +#endif + +// Put this under ifdef so windows systems w/o this +// can still build +#include + +namespace rocksdb { +namespace port { +namespace xpress { + +// Helpers +namespace { + +auto CloseCompressorFun = [](void* h) { + if (NULL != h) { + ::CloseCompressor(reinterpret_cast(h)); + } +}; + +auto CloseDecompressorFun = [](void* h) { + if (NULL != h) { + ::CloseDecompressor(reinterpret_cast(h)); + } +}; + + +#ifdef JEMALLOC +// Make sure compressors use our jemalloc if redirected +PVOID CompressorAlloc(PVOID, SIZE_T size) { + return je_malloc(size); +} + +VOID CompressorFree(PVOID, PVOID p) { + if (p != NULL) { + je_free(p); + } +} + +#endif + +} + +bool Compress(const char* input, size_t length, std::string* output) { + + assert(input != nullptr); + assert(output != nullptr); + + if (length == 0) { + output->clear(); + return true; + } + + COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr; + +#ifdef JEMALLOC + COMPRESS_ALLOCATION_ROUTINES allocationRoutines; + + // Init. allocation routines + allocationRoutines.Allocate = CompressorAlloc; + allocationRoutines.Free = CompressorFree; + allocationRoutines.UserContext = NULL; + + allocRoutinesPtr = &allocationRoutines; +#endif + + COMPRESSOR_HANDLE compressor = NULL; + + BOOL success = CreateCompressor( + COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm + allocRoutinesPtr, // Optional allocation routine + &compressor); // Handle + + if (!success) { +#ifdef _DEBUG + std::cerr << "XPRESS: Failed to create Compressor LastError: " << + GetLastError() << std::endl; +#endif + return false; + } + + std::unique_ptr + compressorGuard(compressor, CloseCompressorFun); + + SIZE_T compressedBufferSize = 0; + + // Query compressed buffer size. + success = ::Compress( + compressor, // Compressor Handle + const_cast(input), // Input buffer + length, // Uncompressed data size + NULL, // Compressed Buffer + 0, // Compressed Buffer size + &compressedBufferSize); // Compressed Data size + + if (!success) { + + auto lastError = GetLastError(); + + if (lastError != ERROR_INSUFFICIENT_BUFFER) { +#ifdef _DEBUG + std::cerr << + "XPRESS: Failed to estimate compressed buffer size LastError " << + lastError << std::endl; +#endif + return false; + } + } + + assert(compressedBufferSize > 0); + + std::string result; + result.resize(compressedBufferSize); + + SIZE_T compressedDataSize = 0; + + // Compress + success = ::Compress( + compressor, // Compressor Handle + const_cast(input), // Input buffer + length, // Uncompressed data size + &result[0], // Compressed Buffer + compressedBufferSize, // Compressed Buffer size + &compressedDataSize); // Compressed Data size + + if (!success) { +#ifdef _DEBUG + std::cerr << "XPRESS: Failed to compress LastError " << + GetLastError() << std::endl; +#endif + return false; + } + + result.resize(compressedDataSize); + output->swap(result); + + return true; +} + +char* Decompress(const char* input_data, size_t input_length, + int* decompress_size) { + + assert(input_data != nullptr); + assert(decompress_size != nullptr); + + if (input_length == 0) { + return nullptr; + } + + COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr; + +#ifdef JEMALLOC + COMPRESS_ALLOCATION_ROUTINES allocationRoutines; + + // Init. allocation routines + allocationRoutines.Allocate = CompressorAlloc; + allocationRoutines.Free = CompressorFree; + allocationRoutines.UserContext = NULL; + allocRoutinesPtr = &allocationRoutines; +#endif + + DECOMPRESSOR_HANDLE decompressor = NULL; + + BOOL success = CreateDecompressor( + COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm + allocRoutinesPtr, // Optional allocation routine + &decompressor); // Handle + + + if (!success) { +#ifdef _DEBUG + std::cerr << + "XPRESS: Failed to create Decompressor LastError " << + GetLastError() << std::endl; +#endif + return nullptr; + } + + std::unique_ptr + compressorGuard(decompressor, CloseDecompressorFun); + + SIZE_T decompressedBufferSize = 0; + + success = ::Decompress( + decompressor, // Compressor Handle + const_cast(input_data), // Compressed data + input_length, // Compressed data size + NULL, // Buffer set to NULL + 0, // Buffer size set to 0 + &decompressedBufferSize); // Decompressed Data size + + if (!success) { + + auto lastError = GetLastError(); + + if (lastError != ERROR_INSUFFICIENT_BUFFER) { +#ifdef _DEBUG + std::cerr << + "XPRESS: Failed to estimate decompressed buffer size LastError " << + lastError << std::endl; +#endif + return nullptr; + } + } + + assert(decompressedBufferSize > 0); + + // On Windows we are limited to a 32-bit int for the + // output data size argument + // so we hopefully never get here + if (decompressedBufferSize > std::numeric_limits::max()) { + assert(false); + return nullptr; + } + + // The callers are deallocating using delete[] + // thus we must allocate with new[] + std::unique_ptr outputBuffer(new char[decompressedBufferSize]); + + SIZE_T decompressedDataSize = 0; + + success = ::Decompress( + decompressor, + const_cast(input_data), + input_length, + outputBuffer.get(), + decompressedBufferSize, + &decompressedDataSize); + + if (!success) { +#ifdef _DEBUG + std::cerr << + "XPRESS: Failed to decompress LastError " << + GetLastError() << std::endl; +#endif + return nullptr; + } + + *decompress_size = static_cast(decompressedDataSize); + + // Return the raw buffer to the caller supporting the tradition + return outputBuffer.release(); +} +} +} +} + +#endif + + diff --git a/port/win/xpress_win.h b/port/win/xpress_win.h new file mode 100644 index 000000000..7d1cbb68b --- /dev/null +++ b/port/win/xpress_win.h @@ -0,0 +1,26 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include + +namespace rocksdb { +namespace port { +namespace xpress { + +bool Compress(const char* input, size_t length, std::string* output); + +char* Decompress(const char* input_data, size_t input_length, + int* decompress_size); + +} +} +} + diff --git a/port/xpress.h b/port/xpress.h new file mode 100644 index 000000000..db023a2d9 --- /dev/null +++ b/port/xpress.h @@ -0,0 +1,17 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +// Xpress on Windows is implemeted using Win API +#if defined(ROCKSDB_PLATFORM_POSIX) +#error "Xpress compression not implemented" +#elif defined(OS_WIN) +#include "port/win/xpress_win.h" +#endif diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 24f4222e1..ff78548b1 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -366,6 +366,13 @@ Slice CompressBlock(const Slice& raw, return *compressed_output; } break; // fall back to no compression. + case kXpressCompression: + if (XPRESS_Compress(raw.data(), raw.size(), + compressed_output) && + GoodCompressionRatio(compressed_output->size(), raw.size())) { + return *compressed_output; + } + break; case kZSTDNotFinalCompression: if (ZSTD_Compress(compression_options, raw.data(), raw.size(), compressed_output) && diff --git a/table/format.cc b/table/format.cc index b20ecf8cf..e5a3c0c28 100644 --- a/table/format.cc +++ b/table/format.cc @@ -364,7 +364,7 @@ Status UncompressBlockContents(const char* data, size_t n, if (!Snappy_GetUncompressedLength(data, n, &ulength)) { return Status::Corruption(snappy_corrupt_msg); } - ubuf = std::unique_ptr(new char[ulength]); + ubuf.reset(new char[ulength]); if (!Snappy_Uncompress(data, n, ubuf.get())) { return Status::Corruption(snappy_corrupt_msg); } @@ -372,7 +372,7 @@ Status UncompressBlockContents(const char* data, size_t n, break; } case kZlibCompression: - ubuf = std::unique_ptr(Zlib_Uncompress( + ubuf.reset(Zlib_Uncompress( data, n, &decompress_size, GetCompressFormatForVersion(kZlibCompression, format_version))); if (!ubuf) { @@ -384,7 +384,7 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kBZip2Compression: - ubuf = std::unique_ptr(BZip2_Uncompress( + ubuf.reset(BZip2_Uncompress( data, n, &decompress_size, GetCompressFormatForVersion(kBZip2Compression, format_version))); if (!ubuf) { @@ -396,7 +396,7 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf = std::unique_ptr(LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress( data, n, &decompress_size, GetCompressFormatForVersion(kLZ4Compression, format_version))); if (!ubuf) { @@ -408,7 +408,7 @@ Status UncompressBlockContents(const char* data, size_t n, BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf = std::unique_ptr(LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress( data, n, &decompress_size, GetCompressFormatForVersion(kLZ4HCCompression, format_version))); if (!ubuf) { @@ -419,9 +419,18 @@ Status UncompressBlockContents(const char* data, size_t n, *contents = BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; + case kXpressCompression: + ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size)); + if (!ubuf) { + static char xpress_corrupt_msg[] = + "XPRESS not supported or corrupted XPRESS compressed block contents"; + return Status::Corruption(xpress_corrupt_msg); + } + *contents = + BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); + break; case kZSTDNotFinalCompression: - ubuf = - std::unique_ptr(ZSTD_Uncompress(data, n, &decompress_size)); + ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size)); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; diff --git a/table/format.h b/table/format.h index 48bcf6785..15203dfd9 100644 --- a/table/format.h +++ b/table/format.h @@ -69,6 +69,7 @@ inline uint32_t GetCompressFormatForVersion(CompressionType compression_type, uint32_t version) { // snappy is not versioned assert(compression_type != kSnappyCompression && + compression_type != kXpressCompression && compression_type != kNoCompression); // As of version 2, we encode compressed block with // compress_format_version == 2. Before that, the version is 1. diff --git a/table/table_test.cc b/table/table_test.cc index 2ed5e7775..2ab77c750 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -539,6 +539,10 @@ static std::vector GenerateArgList() { compression_types.emplace_back(kLZ4HCCompression, false); compression_types.emplace_back(kLZ4HCCompression, true); } + if (XPRESS_Supported()) { + compression_types.emplace_back(kXpressCompression, false); + compression_types.emplace_back(kXpressCompression, true); + } if (ZSTD_Supported()) { compression_types.emplace_back(kZSTDNotFinalCompression, false); compression_types.emplace_back(kZSTDNotFinalCompression, true); @@ -2073,6 +2077,13 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) { compression_state.push_back(kLZ4HCCompression); } + if (!XPRESS_Supported()) { + fprintf(stderr, "skipping xpress and xpress compression tests\n"); + } + else { + compression_state.push_back(kXpressCompression); + } + for (auto state : compression_state) { DoCompressionTest(state); } diff --git a/thirdparty.inc b/thirdparty.inc index e10bdaa4e..9fffd9bff 100644 --- a/thirdparty.inc +++ b/thirdparty.inc @@ -7,6 +7,7 @@ set(USE_GFLAGS_DEFAULT 0) # GFLAGS is disabled by default, enable with -D set(USE_SNAPPY_DEFAULT 0) # SNAPPY is disabled by default, enable with -DSNAPPY=1 cmake command line agrument set(USE_LZ4_DEFAULT 0) # LZ4 is disabled by default, enable with -DLZ4=1 cmake command line agrument set(USE_ZLIB_DEFAULT 0) # ZLIB is disabled by default, enable with -DZLIB=1 cmake command line agrument +set(USE_XPRESS_DEFAULT 0) # XPRESS is disabled by default, enable with -DXPRESS=1 cmake command line agrument set(USE_JEMALLOC_DEFAULT 0) # JEMALLOC is disabled by default, enable with -DJEMALLOC=1 cmake command line agrument set(USE_JENONINIT_DEFAULT 1) # Default is enabled do not call je_init/je_uninit as the newer versions do not have it disable with -DJENONINIT=0 @@ -189,6 +190,23 @@ else () message(STATUS "ZLIB library is disabled") endif () +if (DEFINED XPRESS) + set(USE_XPRESS ${XPRESS}) +else () + set(USE_XPRESS ${USE_XPRESS_DEFAULT}) +endif () + +if (${USE_XPRESS} EQUAL 1) + message(STATUS "XPRESS is enabled") + + add_definitions(-DXPRESS) + + # We are using the implementation provided by the system + set (SYSTEM_LIBS ${SYSTEM_LIBS} Cabinet.lib) +else () + message(STATUS "XPRESS is disabled") +endif () + # # Edit these 4 lines to define paths to Jemalloc # diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 888e96d71..5cbc6c80d 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -534,6 +534,8 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { return rocksdb::kLZ4Compression; else if (!strcasecmp(ctype, "lz4hc")) return rocksdb::kLZ4HCCompression; + else if (!strcasecmp(ctype, "xpress")) + return rocksdb::kXpressCompression; else if (!strcasecmp(ctype, "zstd")) return rocksdb::kZSTDNotFinalCompression; @@ -1604,6 +1606,10 @@ class Benchmark { ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(), input.size(), compressed); break; + case rocksdb::kXpressCompression: + ok = XPRESS_Compress(input.data(), + input.size(), compressed); + break; case rocksdb::kZSTDNotFinalCompression: ok = ZSTD_Compress(Options().compression_opts, input.data(), input.size(), compressed); @@ -2319,6 +2325,11 @@ class Benchmark { &decompress_size, 2); ok = uncompressed != nullptr; break; + case rocksdb::kXpressCompression: + uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(), + &decompress_size); + ok = uncompressed != nullptr; + break; case rocksdb::kZSTDNotFinalCompression: uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(), &decompress_size); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 3e9c40bd6..f2e157906 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -368,6 +368,8 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) { return rocksdb::kLZ4Compression; else if (!strcasecmp(ctype, "lz4hc")) return rocksdb::kLZ4HCCompression; + else if (!strcasecmp(ctype, "xpress")) + return rocksdb::kXpressCompression; else if (!strcasecmp(ctype, "zstd")) return rocksdb::kZSTDNotFinalCompression; diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index d3781e3f6..08a99c9f0 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -300,6 +300,8 @@ Options LDBCommand::PrepareOptionsForOpenDB() { opt.compression = kLZ4Compression; } else if (comp == "lz4hc") { opt.compression = kLZ4HCCompression; + } else if (comp == "xpress") { + opt.compression = kXpressCompression; } else if (comp == "zstd") { opt.compression = kZSTDNotFinalCompression; } else { diff --git a/tools/sst_dump_tool.cc b/tools/sst_dump_tool.cc index 6890c9cdc..52bfb554d 100644 --- a/tools/sst_dump_tool.cc +++ b/tools/sst_dump_tool.cc @@ -180,35 +180,27 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) { std::vector > block_based_table_factories; - std::map compress_type; - compress_type.insert( - std::make_pair(CompressionType::kNoCompression, "kNoCompression")); - compress_type.insert(std::make_pair(CompressionType::kSnappyCompression, - "kSnappyCompression")); - compress_type.insert( - std::make_pair(CompressionType::kZlibCompression, "kZlibCompression")); - compress_type.insert( - std::make_pair(CompressionType::kBZip2Compression, "kBZip2Compression")); - compress_type.insert( - std::make_pair(CompressionType::kLZ4Compression, "kLZ4Compression")); - compress_type.insert( - std::make_pair(CompressionType::kLZ4HCCompression, "kLZ4HCCompression")); - compress_type.insert(std::make_pair(CompressionType::kZSTDNotFinalCompression, - "kZSTDNotFinalCompression")); - fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); - for (CompressionType i = CompressionType::kNoCompression; - i <= CompressionType::kZSTDNotFinalCompression; - i = (i == kLZ4HCCompression) ? kZSTDNotFinalCompression - : CompressionType(i + 1)) { + std::pair compressions[] = { + { CompressionType::kNoCompression, "kNoCompression" }, + { CompressionType::kSnappyCompression, "kSnappyCompression" }, + { CompressionType::kZlibCompression, "kZlibCompression" }, + { CompressionType::kBZip2Compression, "kBZip2Compression" }, + { CompressionType::kLZ4Compression, "kLZ4Compression" }, + { CompressionType::kLZ4HCCompression, "kLZ4HCCompression" }, + { CompressionType::kXpressCompression, "kXpressCompression" }, + { CompressionType::kZSTDNotFinalCompression, "kZSTDNotFinalCompression" } + }; + + for (auto& i : compressions) { CompressionOptions compress_opt; std::string column_family_name; - TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i, + TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i.first, compress_opt, false /* skip_filters */, column_family_name); uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size); - fprintf(stdout, "Compression: %s", compress_type.find(i)->second); + fprintf(stdout, "Compression: %s", i.second); fprintf(stdout, " Size: %" PRIu64 "\n", file_size); } return 0; diff --git a/util/compression.h b/util/compression.h index 2690e3001..20e52833f 100644 --- a/util/compression.h +++ b/util/compression.h @@ -37,6 +37,10 @@ #include #endif +#if defined(XPRESS) +#include "port/xpress.h" +#endif + namespace rocksdb { inline bool Snappy_Supported() { @@ -67,6 +71,13 @@ inline bool LZ4_Supported() { return false; } +inline bool XPRESS_Supported() { +#ifdef XPRESS + return true; +#endif + return false; +} + inline bool ZSTD_Supported() { #ifdef ZSTD return true; @@ -88,6 +99,8 @@ inline bool CompressionTypeSupported(CompressionType compression_type) { return LZ4_Supported(); case kLZ4HCCompression: return LZ4_Supported(); + case kXpressCompression: + return XPRESS_Supported(); case kZSTDNotFinalCompression: return ZSTD_Supported(); default: @@ -110,6 +123,8 @@ inline std::string CompressionTypeToString(CompressionType compression_type) { return "LZ4"; case kLZ4HCCompression: return "LZ4HC"; + case kXpressCompression: + return "Xpress"; case kZSTDNotFinalCompression: return "ZSTD"; default: @@ -606,6 +621,22 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts, return false; } +inline bool XPRESS_Compress(const char* input, size_t length, std::string* output) { +#ifdef XPRESS + return port::xpress::Compress(input, length, output); +#endif + return false; +} + +inline char* XPRESS_Uncompress(const char* input_data, size_t input_length, + int* decompress_size) { +#ifdef XPRESS + return port::xpress::Decompress(input_data, input_length, decompress_size); +#endif + return nullptr; +} + + inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, size_t length, ::std::string* output) { #ifdef ZSTD diff --git a/util/options_helper.h b/util/options_helper.h index 953f2494d..e81365e93 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -572,6 +572,7 @@ static std::unordered_map {"kBZip2Compression", kBZip2Compression}, {"kLZ4Compression", kLZ4Compression}, {"kLZ4HCCompression", kLZ4HCCompression}, + {"kXpressCompression", kXpressCompression }, {"kZSTDNotFinalCompression", kZSTDNotFinalCompression}}; static std::unordered_map diff --git a/util/options_test.cc b/util/options_test.cc index 405e2b36a..7ca41291f 100644 --- a/util/options_test.cc +++ b/util/options_test.cc @@ -101,9 +101,10 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { "kBZip2Compression:" "kLZ4Compression:" "kLZ4HCCompression:" + "kXpressCompression:" "kZSTDNotFinalCompression"}, {"compression_opts", "4:5:6"}, - {"num_levels", "7"}, + {"num_levels", "8"}, {"level0_file_num_compaction_trigger", "8"}, {"level0_slowdown_writes_trigger", "9"}, {"level0_stop_writes_trigger", "10"}, @@ -188,18 +189,19 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3); ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99); ASSERT_EQ(new_cf_opt.compression, kSnappyCompression); - ASSERT_EQ(new_cf_opt.compression_per_level.size(), 7U); + ASSERT_EQ(new_cf_opt.compression_per_level.size(), 8U); ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression); ASSERT_EQ(new_cf_opt.compression_per_level[1], kSnappyCompression); ASSERT_EQ(new_cf_opt.compression_per_level[2], kZlibCompression); ASSERT_EQ(new_cf_opt.compression_per_level[3], kBZip2Compression); ASSERT_EQ(new_cf_opt.compression_per_level[4], kLZ4Compression); ASSERT_EQ(new_cf_opt.compression_per_level[5], kLZ4HCCompression); - ASSERT_EQ(new_cf_opt.compression_per_level[6], kZSTDNotFinalCompression); + ASSERT_EQ(new_cf_opt.compression_per_level[6], kXpressCompression); + ASSERT_EQ(new_cf_opt.compression_per_level[7], kZSTDNotFinalCompression); ASSERT_EQ(new_cf_opt.compression_opts.window_bits, 4); ASSERT_EQ(new_cf_opt.compression_opts.level, 5); ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); - ASSERT_EQ(new_cf_opt.num_levels, 7); + ASSERT_EQ(new_cf_opt.num_levels, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9); ASSERT_EQ(new_cf_opt.level0_stop_writes_trigger, 10);