Introduce XPRESS compresssion on Windows. (#1081)

Comparable with Snappy on comp ratio.
  Implemented using Windows API, does not require external package.
  Avaiable since Windows 8 and server 2012.
  Use -DXPRESS=1 with CMake to enable.
main
Dmitri Smirnov 10 years ago committed by Siying Dong
parent 874c96ac1d
commit ee221d2de0
  1. 5
      CMakeLists.txt
  2. 3
      db/compaction_job_stats_test.cc
  3. 40
      db/db_test.cc
  4. 5
      db/db_wal_test.cc
  5. 1
      include/rocksdb/options.h
  6. 17
      port/win/port_win.h
  7. 270
      port/win/xpress_win.cc
  8. 26
      port/win/xpress_win.h
  9. 17
      port/xpress.h
  10. 7
      table/block_based_table_builder.cc
  11. 23
      table/format.cc
  12. 1
      table/format.h
  13. 11
      table/table_test.cc
  14. 18
      thirdparty.inc
  15. 11
      tools/db_bench_tool.cc
  16. 2
      tools/db_stress.cc
  17. 2
      tools/ldb_cmd.cc
  18. 36
      tools/sst_dump_tool.cc
  19. 31
      util/compression.h
  20. 1
      util/options_helper.h
  21. 10
      util/options_test.cc

@ -82,7 +82,7 @@ set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /Oxt /Zp8 /Gm- /Gy /MD")
set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG") set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG")
add_definitions(-DWIN32 -DOS_WIN -D_MBCS -DWIN64) add_definitions(-DWIN32 -DOS_WIN -D_MBCS -DWIN64 -DNOMINMAX)
include_directories(${PROJECT_SOURCE_DIR}) include_directories(${PROJECT_SOURCE_DIR})
include_directories(${PROJECT_SOURCE_DIR}/include) include_directories(${PROJECT_SOURCE_DIR}/include)
@ -90,7 +90,7 @@ include_directories(${PROJECT_SOURCE_DIR}/third-party/gtest-1.7.0/fused-src)
set(ROCKSDB_LIBS rocksdblib${ARTIFACT_SUFFIX}) set(ROCKSDB_LIBS rocksdblib${ARTIFACT_SUFFIX})
set(THIRDPARTY_LIBS ${THIRDPARTY_LIBS} gtest) set(THIRDPARTY_LIBS ${THIRDPARTY_LIBS} gtest)
set(SYSTEM_LIBS Shlwapi.lib Rpcrt4.lib) set(SYSTEM_LIBS ${SYSTEM_LIBS} Shlwapi.lib Rpcrt4.lib)
set(LIBS ${ROCKSDB_LIBS} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS}) set(LIBS ${ROCKSDB_LIBS} ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
@ -156,6 +156,7 @@ set(SOURCES
port/win/env_win.cc port/win/env_win.cc
port/win/port_win.cc port/win/port_win.cc
port/win/win_logger.cc port/win/win_logger.cc
port/win/xpress_win.cc
table/adaptive_table_factory.cc table/adaptive_table_factory.cc
table/block.cc table/block.cc
table/block_based_filter_block.cc table/block_based_filter_block.cc

@ -624,7 +624,10 @@ CompressionType GetAnyCompression() {
return kBZip2Compression; return kBZip2Compression;
} else if (LZ4_Supported()) { } else if (LZ4_Supported()) {
return kLZ4Compression; return kLZ4Compression;
} else if (XPRESS_Supported()) {
return kXpressCompression;
} }
return kNoCompression; return kNoCompression;
} }

@ -3184,8 +3184,8 @@ class ModelDB : public DB {
virtual Status GetUpdatesSince( virtual Status GetUpdatesSince(
rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*, rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
const TransactionLogIterator::ReadOptions& read_options = const TransactionLogIterator::ReadOptions&
TransactionLogIterator::ReadOptions()) override { read_options = TransactionLogIterator::ReadOptions()) override {
return Status::NotSupported("Not supported in Model DB"); return Status::NotSupported("Not supported in Model DB");
} }
@ -3271,7 +3271,8 @@ static bool CompareIterators(int step, DB* model, DB* db,
ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) { ok && miter->Valid() && dbiter->Valid(); miter->Next(), dbiter->Next()) {
count++; count++;
if (miter->key().compare(dbiter->key()) != 0) { if (miter->key().compare(dbiter->key()) != 0) {
fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n", step, fprintf(stderr, "step %d: Key mismatch: '%s' vs. '%s'\n",
step,
EscapeString(miter->key()).c_str(), EscapeString(miter->key()).c_str(),
EscapeString(dbiter->key()).c_str()); EscapeString(dbiter->key()).c_str());
ok = false; ok = false;
@ -3474,6 +3475,7 @@ TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(1)); options.prefix_extractor.reset(NewFixedPrefixTransform(1));
Reopen(options); Reopen(options);
ASSERT_OK(Put("k1", "v1")); ASSERT_OK(Put("k1", "v1"));
Flush(); Flush();
@ -3672,15 +3674,15 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
} }
TEST_F(DBTest, MmapAndBufferOptions) {
Options options = CurrentOptions();
// If allow_mmap_reads is on allow_os_buffer must also be on
// On Windows you can have either memory mapped file or a file // On Windows you can have either memory mapped file or a file
// with unbuffered access. // with unbuffered access. So this asserts and does not make
// sense to run
#ifndef OS_WIN #ifndef OS_WIN
TEST_F(DBTest, MmapAndBufferOptions) {
Options options = CurrentOptions();
options.allow_os_buffer = false; options.allow_os_buffer = false;
#endif
options.allow_mmap_reads = true; options.allow_mmap_reads = true;
ASSERT_NOK(TryReopen(options)); ASSERT_NOK(TryReopen(options));
@ -3695,6 +3697,7 @@ TEST_F(DBTest, MmapAndBufferOptions) {
options.allow_os_buffer = true; options.allow_os_buffer = true;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
} }
#endif
TEST_F(DBTest, ConcurrentMemtableNotSupported) { TEST_F(DBTest, ConcurrentMemtableNotSupported) {
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -4948,10 +4951,12 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
// iter 1 -- bzip2 // iter 1 -- bzip2
// iter 2 -- lz4 // iter 2 -- lz4
// iter 3 -- lz4HC // iter 3 -- lz4HC
// iter 4 -- xpress
CompressionType compressions[] = {kZlibCompression, kBZip2Compression, CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression}; kLZ4Compression, kLZ4HCCompression,
for (int iter = 0; iter < 4; ++iter) { kXpressCompression};
if (!CompressionTypeSupported(compressions[iter])) { for (auto comp : compressions) {
if (!CompressionTypeSupported(comp)) {
continue; continue;
} }
// first_table_version 1 -- generate with table_version == 1, read with // first_table_version 1 -- generate with table_version == 1, read with
@ -4966,7 +4971,7 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.table_factory.reset(NewBlockBasedTableFactory(table_options)); options.table_factory.reset(NewBlockBasedTableFactory(table_options));
options.create_if_missing = true; options.create_if_missing = true;
options.compression = compressions[iter]; options.compression = comp;
DestroyAndReopen(options); DestroyAndReopen(options);
int kNumKeysWritten = 100000; int kNumKeysWritten = 100000;
@ -5809,18 +5814,19 @@ TEST_F(DBTest, LastWriteBufferDelay) {
TEST_F(DBTest, FailWhenCompressionNotSupportedTest) { TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
CompressionType compressions[] = {kZlibCompression, kBZip2Compression, CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
kLZ4Compression, kLZ4HCCompression}; kLZ4Compression, kLZ4HCCompression,
for (int iter = 0; iter < 4; ++iter) { kXpressCompression};
if (!CompressionTypeSupported(compressions[iter])) { for (auto comp : compressions) {
if (!CompressionTypeSupported(comp)) {
// not supported, we should fail the Open() // not supported, we should fail the Open()
Options options = CurrentOptions(); Options options = CurrentOptions();
options.compression = compressions[iter]; options.compression = comp;
ASSERT_TRUE(!TryReopen(options).ok()); ASSERT_TRUE(!TryReopen(options).ok());
// Try if CreateColumnFamily also fails // Try if CreateColumnFamily also fails
options.compression = kNoCompression; options.compression = kNoCompression;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
ColumnFamilyOptions cf_options(options); ColumnFamilyOptions cf_options(options);
cf_options.compression = compressions[iter]; cf_options.compression = comp;
ColumnFamilyHandle* handle; ColumnFamilyHandle* handle;
ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok()); ASSERT_TRUE(!db_->CreateColumnFamily(cf_options, "name", &handle).ok());
} }

@ -584,8 +584,11 @@ class RecoveryTestHelper {
int fd = open(filename.c_str(), O_RDWR); int fd = open(filename.c_str(), O_RDWR);
// On windows long is 32-bit
ASSERT_LE(offset, std::numeric_limits<long>::max());
ASSERT_GT(fd, 0); ASSERT_GT(fd, 0);
ASSERT_EQ(offset, lseek(fd, offset, SEEK_SET)); ASSERT_EQ(offset, lseek(fd, static_cast<long>(offset), SEEK_SET));
void* buf = alloca(len); void* buf = alloca(len);
memset(buf, 'a', len); memset(buf, 'a', len);

@ -61,6 +61,7 @@ enum CompressionType : char {
kBZip2Compression = 0x3, kBZip2Compression = 0x3,
kLZ4Compression = 0x4, kLZ4Compression = 0x4,
kLZ4HCCompression = 0x5, kLZ4HCCompression = 0x5,
kXpressCompression = 0x6,
// zstd format is not finalized yet so it's subject to changes. // zstd format is not finalized yet so it's subject to changes.
kZSTDNotFinalCompression = 0x40, kZSTDNotFinalCompression = 0x40,
}; };

@ -56,23 +56,6 @@ typedef SSIZE_T ssize_t;
#define __attribute__(A) #define __attribute__(A)
#ifdef ZLIB
#include <zlib.h>
#endif
#ifdef BZIP2
#include <bzlib.h>
#endif
#if defined(LZ4)
#include <lz4.h>
#include <lz4hc.h>
#endif
#ifdef SNAPPY
#include <snappy.h>
#endif
// Thread local storage on Linux // Thread local storage on Linux
// There is thread_local in C++11 // There is thread_local in C++11
#ifndef __thread #ifndef __thread

@ -0,0 +1,270 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "port/win/xpress_win.h"
#include <Windows.h>
#include <cassert>
#include <memory>
#include <limits>
#include <iostream>
#ifdef XPRESS
#ifdef JEMALLOC
#include <jemalloc/jemalloc.h>
#endif
// Put this under ifdef so windows systems w/o this
// can still build
#include <compressapi.h>
namespace rocksdb {
namespace port {
namespace xpress {
// Helpers
namespace {
auto CloseCompressorFun = [](void* h) {
if (NULL != h) {
::CloseCompressor(reinterpret_cast<COMPRESSOR_HANDLE>(h));
}
};
auto CloseDecompressorFun = [](void* h) {
if (NULL != h) {
::CloseDecompressor(reinterpret_cast<DECOMPRESSOR_HANDLE>(h));
}
};
#ifdef JEMALLOC
// Make sure compressors use our jemalloc if redirected
PVOID CompressorAlloc(PVOID, SIZE_T size) {
return je_malloc(size);
}
VOID CompressorFree(PVOID, PVOID p) {
if (p != NULL) {
je_free(p);
}
}
#endif
}
bool Compress(const char* input, size_t length, std::string* output) {
assert(input != nullptr);
assert(output != nullptr);
if (length == 0) {
output->clear();
return true;
}
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
#ifdef JEMALLOC
COMPRESS_ALLOCATION_ROUTINES allocationRoutines;
// Init. allocation routines
allocationRoutines.Allocate = CompressorAlloc;
allocationRoutines.Free = CompressorFree;
allocationRoutines.UserContext = NULL;
allocRoutinesPtr = &allocationRoutines;
#endif
COMPRESSOR_HANDLE compressor = NULL;
BOOL success = CreateCompressor(
COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&compressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to create Compressor LastError: " <<
GetLastError() << std::endl;
#endif
return false;
}
std::unique_ptr<void, decltype(CloseCompressorFun)>
compressorGuard(compressor, CloseCompressorFun);
SIZE_T compressedBufferSize = 0;
// Query compressed buffer size.
success = ::Compress(
compressor, // Compressor Handle
const_cast<char*>(input), // Input buffer
length, // Uncompressed data size
NULL, // Compressed Buffer
0, // Compressed Buffer size
&compressedBufferSize); // Compressed Data size
if (!success) {
auto lastError = GetLastError();
if (lastError != ERROR_INSUFFICIENT_BUFFER) {
#ifdef _DEBUG
std::cerr <<
"XPRESS: Failed to estimate compressed buffer size LastError " <<
lastError << std::endl;
#endif
return false;
}
}
assert(compressedBufferSize > 0);
std::string result;
result.resize(compressedBufferSize);
SIZE_T compressedDataSize = 0;
// Compress
success = ::Compress(
compressor, // Compressor Handle
const_cast<char*>(input), // Input buffer
length, // Uncompressed data size
&result[0], // Compressed Buffer
compressedBufferSize, // Compressed Buffer size
&compressedDataSize); // Compressed Data size
if (!success) {
#ifdef _DEBUG
std::cerr << "XPRESS: Failed to compress LastError " <<
GetLastError() << std::endl;
#endif
return false;
}
result.resize(compressedDataSize);
output->swap(result);
return true;
}
char* Decompress(const char* input_data, size_t input_length,
int* decompress_size) {
assert(input_data != nullptr);
assert(decompress_size != nullptr);
if (input_length == 0) {
return nullptr;
}
COMPRESS_ALLOCATION_ROUTINES* allocRoutinesPtr = nullptr;
#ifdef JEMALLOC
COMPRESS_ALLOCATION_ROUTINES allocationRoutines;
// Init. allocation routines
allocationRoutines.Allocate = CompressorAlloc;
allocationRoutines.Free = CompressorFree;
allocationRoutines.UserContext = NULL;
allocRoutinesPtr = &allocationRoutines;
#endif
DECOMPRESSOR_HANDLE decompressor = NULL;
BOOL success = CreateDecompressor(
COMPRESS_ALGORITHM_XPRESS, // Compression Algorithm
allocRoutinesPtr, // Optional allocation routine
&decompressor); // Handle
if (!success) {
#ifdef _DEBUG
std::cerr <<
"XPRESS: Failed to create Decompressor LastError " <<
GetLastError() << std::endl;
#endif
return nullptr;
}
std::unique_ptr<void, decltype(CloseDecompressorFun)>
compressorGuard(decompressor, CloseDecompressorFun);
SIZE_T decompressedBufferSize = 0;
success = ::Decompress(
decompressor, // Compressor Handle
const_cast<char*>(input_data), // Compressed data
input_length, // Compressed data size
NULL, // Buffer set to NULL
0, // Buffer size set to 0
&decompressedBufferSize); // Decompressed Data size
if (!success) {
auto lastError = GetLastError();
if (lastError != ERROR_INSUFFICIENT_BUFFER) {
#ifdef _DEBUG
std::cerr <<
"XPRESS: Failed to estimate decompressed buffer size LastError " <<
lastError << std::endl;
#endif
return nullptr;
}
}
assert(decompressedBufferSize > 0);
// On Windows we are limited to a 32-bit int for the
// output data size argument
// so we hopefully never get here
if (decompressedBufferSize > std::numeric_limits<int>::max()) {
assert(false);
return nullptr;
}
// The callers are deallocating using delete[]
// thus we must allocate with new[]
std::unique_ptr<char[]> outputBuffer(new char[decompressedBufferSize]);
SIZE_T decompressedDataSize = 0;
success = ::Decompress(
decompressor,
const_cast<char*>(input_data),
input_length,
outputBuffer.get(),
decompressedBufferSize,
&decompressedDataSize);
if (!success) {
#ifdef _DEBUG
std::cerr <<
"XPRESS: Failed to decompress LastError " <<
GetLastError() << std::endl;
#endif
return nullptr;
}
*decompress_size = static_cast<int>(decompressedDataSize);
// Return the raw buffer to the caller supporting the tradition
return outputBuffer.release();
}
}
}
}
#endif

@ -0,0 +1,26 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <string>
namespace rocksdb {
namespace port {
namespace xpress {
bool Compress(const char* input, size_t length, std::string* output);
char* Decompress(const char* input_data, size_t input_length,
int* decompress_size);
}
}
}

@ -0,0 +1,17 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
// Xpress on Windows is implemeted using Win API
#if defined(ROCKSDB_PLATFORM_POSIX)
#error "Xpress compression not implemented"
#elif defined(OS_WIN)
#include "port/win/xpress_win.h"
#endif

@ -366,6 +366,13 @@ Slice CompressBlock(const Slice& raw,
return *compressed_output; return *compressed_output;
} }
break; // fall back to no compression. break; // fall back to no compression.
case kXpressCompression:
if (XPRESS_Compress(raw.data(), raw.size(),
compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output;
}
break;
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
if (ZSTD_Compress(compression_options, raw.data(), raw.size(), if (ZSTD_Compress(compression_options, raw.data(), raw.size(),
compressed_output) && compressed_output) &&

@ -364,7 +364,7 @@ Status UncompressBlockContents(const char* data, size_t n,
if (!Snappy_GetUncompressedLength(data, n, &ulength)) { if (!Snappy_GetUncompressedLength(data, n, &ulength)) {
return Status::Corruption(snappy_corrupt_msg); return Status::Corruption(snappy_corrupt_msg);
} }
ubuf = std::unique_ptr<char[]>(new char[ulength]); ubuf.reset(new char[ulength]);
if (!Snappy_Uncompress(data, n, ubuf.get())) { if (!Snappy_Uncompress(data, n, ubuf.get())) {
return Status::Corruption(snappy_corrupt_msg); return Status::Corruption(snappy_corrupt_msg);
} }
@ -372,7 +372,7 @@ Status UncompressBlockContents(const char* data, size_t n,
break; break;
} }
case kZlibCompression: case kZlibCompression:
ubuf = std::unique_ptr<char[]>(Zlib_Uncompress( ubuf.reset(Zlib_Uncompress(
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version))); GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) { if (!ubuf) {
@ -384,7 +384,7 @@ Status UncompressBlockContents(const char* data, size_t n,
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kBZip2Compression: case kBZip2Compression:
ubuf = std::unique_ptr<char[]>(BZip2_Uncompress( ubuf.reset(BZip2_Uncompress(
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kBZip2Compression, format_version))); GetCompressFormatForVersion(kBZip2Compression, format_version)));
if (!ubuf) { if (!ubuf) {
@ -396,7 +396,7 @@ Status UncompressBlockContents(const char* data, size_t n,
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf = std::unique_ptr<char[]>(LZ4_Uncompress( ubuf.reset(LZ4_Uncompress(
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version))); GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) { if (!ubuf) {
@ -408,7 +408,7 @@ Status UncompressBlockContents(const char* data, size_t n,
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf = std::unique_ptr<char[]>(LZ4_Uncompress( ubuf.reset(LZ4_Uncompress(
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version))); GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) { if (!ubuf) {
@ -419,9 +419,18 @@ Status UncompressBlockContents(const char* data, size_t n,
*contents = *contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kXpressCompression:
ubuf.reset(XPRESS_Uncompress(data, n, &decompress_size));
if (!ubuf) {
static char xpress_corrupt_msg[] =
"XPRESS not supported or corrupted XPRESS compressed block contents";
return Status::Corruption(xpress_corrupt_msg);
}
*contents =
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break;
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
ubuf = ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size));
std::unique_ptr<char[]>(ZSTD_Uncompress(data, n, &decompress_size));
if (!ubuf) { if (!ubuf) {
static char zstd_corrupt_msg[] = static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents"; "ZSTD not supported or corrupted ZSTD compressed block contents";

@ -69,6 +69,7 @@ inline uint32_t GetCompressFormatForVersion(CompressionType compression_type,
uint32_t version) { uint32_t version) {
// snappy is not versioned // snappy is not versioned
assert(compression_type != kSnappyCompression && assert(compression_type != kSnappyCompression &&
compression_type != kXpressCompression &&
compression_type != kNoCompression); compression_type != kNoCompression);
// As of version 2, we encode compressed block with // As of version 2, we encode compressed block with
// compress_format_version == 2. Before that, the version is 1. // compress_format_version == 2. Before that, the version is 1.

@ -539,6 +539,10 @@ static std::vector<TestArgs> GenerateArgList() {
compression_types.emplace_back(kLZ4HCCompression, false); compression_types.emplace_back(kLZ4HCCompression, false);
compression_types.emplace_back(kLZ4HCCompression, true); compression_types.emplace_back(kLZ4HCCompression, true);
} }
if (XPRESS_Supported()) {
compression_types.emplace_back(kXpressCompression, false);
compression_types.emplace_back(kXpressCompression, true);
}
if (ZSTD_Supported()) { if (ZSTD_Supported()) {
compression_types.emplace_back(kZSTDNotFinalCompression, false); compression_types.emplace_back(kZSTDNotFinalCompression, false);
compression_types.emplace_back(kZSTDNotFinalCompression, true); compression_types.emplace_back(kZSTDNotFinalCompression, true);
@ -2073,6 +2077,13 @@ TEST_F(GeneralTableTest, ApproximateOffsetOfCompressed) {
compression_state.push_back(kLZ4HCCompression); compression_state.push_back(kLZ4HCCompression);
} }
if (!XPRESS_Supported()) {
fprintf(stderr, "skipping xpress and xpress compression tests\n");
}
else {
compression_state.push_back(kXpressCompression);
}
for (auto state : compression_state) { for (auto state : compression_state) {
DoCompressionTest(state); DoCompressionTest(state);
} }

@ -7,6 +7,7 @@ set(USE_GFLAGS_DEFAULT 0) # GFLAGS is disabled by default, enable with -D
set(USE_SNAPPY_DEFAULT 0) # SNAPPY is disabled by default, enable with -DSNAPPY=1 cmake command line agrument set(USE_SNAPPY_DEFAULT 0) # SNAPPY is disabled by default, enable with -DSNAPPY=1 cmake command line agrument
set(USE_LZ4_DEFAULT 0) # LZ4 is disabled by default, enable with -DLZ4=1 cmake command line agrument set(USE_LZ4_DEFAULT 0) # LZ4 is disabled by default, enable with -DLZ4=1 cmake command line agrument
set(USE_ZLIB_DEFAULT 0) # ZLIB is disabled by default, enable with -DZLIB=1 cmake command line agrument set(USE_ZLIB_DEFAULT 0) # ZLIB is disabled by default, enable with -DZLIB=1 cmake command line agrument
set(USE_XPRESS_DEFAULT 0) # XPRESS is disabled by default, enable with -DXPRESS=1 cmake command line agrument
set(USE_JEMALLOC_DEFAULT 0) # JEMALLOC is disabled by default, enable with -DJEMALLOC=1 cmake command line agrument set(USE_JEMALLOC_DEFAULT 0) # JEMALLOC is disabled by default, enable with -DJEMALLOC=1 cmake command line agrument
set(USE_JENONINIT_DEFAULT 1) # Default is enabled do not call je_init/je_uninit as the newer versions do not have it disable with -DJENONINIT=0 set(USE_JENONINIT_DEFAULT 1) # Default is enabled do not call je_init/je_uninit as the newer versions do not have it disable with -DJENONINIT=0
@ -189,6 +190,23 @@ else ()
message(STATUS "ZLIB library is disabled") message(STATUS "ZLIB library is disabled")
endif () endif ()
if (DEFINED XPRESS)
set(USE_XPRESS ${XPRESS})
else ()
set(USE_XPRESS ${USE_XPRESS_DEFAULT})
endif ()
if (${USE_XPRESS} EQUAL 1)
message(STATUS "XPRESS is enabled")
add_definitions(-DXPRESS)
# We are using the implementation provided by the system
set (SYSTEM_LIBS ${SYSTEM_LIBS} Cabinet.lib)
else ()
message(STATUS "XPRESS is disabled")
endif ()
# #
# Edit these 4 lines to define paths to Jemalloc # Edit these 4 lines to define paths to Jemalloc
# #

@ -534,6 +534,8 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
return rocksdb::kLZ4Compression; return rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc")) else if (!strcasecmp(ctype, "lz4hc"))
return rocksdb::kLZ4HCCompression; return rocksdb::kLZ4HCCompression;
else if (!strcasecmp(ctype, "xpress"))
return rocksdb::kXpressCompression;
else if (!strcasecmp(ctype, "zstd")) else if (!strcasecmp(ctype, "zstd"))
return rocksdb::kZSTDNotFinalCompression; return rocksdb::kZSTDNotFinalCompression;
@ -1604,6 +1606,10 @@ class Benchmark {
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(), ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kXpressCompression:
ok = XPRESS_Compress(input.data(),
input.size(), compressed);
break;
case rocksdb::kZSTDNotFinalCompression: case rocksdb::kZSTDNotFinalCompression:
ok = ZSTD_Compress(Options().compression_opts, input.data(), ok = ZSTD_Compress(Options().compression_opts, input.data(),
input.size(), compressed); input.size(), compressed);
@ -2319,6 +2325,11 @@ class Benchmark {
&decompress_size, 2); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kXpressCompression:
uncompressed = XPRESS_Uncompress(compressed.data(), compressed.size(),
&decompress_size);
ok = uncompressed != nullptr;
break;
case rocksdb::kZSTDNotFinalCompression: case rocksdb::kZSTDNotFinalCompression:
uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(), uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(),
&decompress_size); &decompress_size);

@ -368,6 +368,8 @@ enum rocksdb::CompressionType StringToCompressionType(const char* ctype) {
return rocksdb::kLZ4Compression; return rocksdb::kLZ4Compression;
else if (!strcasecmp(ctype, "lz4hc")) else if (!strcasecmp(ctype, "lz4hc"))
return rocksdb::kLZ4HCCompression; return rocksdb::kLZ4HCCompression;
else if (!strcasecmp(ctype, "xpress"))
return rocksdb::kXpressCompression;
else if (!strcasecmp(ctype, "zstd")) else if (!strcasecmp(ctype, "zstd"))
return rocksdb::kZSTDNotFinalCompression; return rocksdb::kZSTDNotFinalCompression;

@ -300,6 +300,8 @@ Options LDBCommand::PrepareOptionsForOpenDB() {
opt.compression = kLZ4Compression; opt.compression = kLZ4Compression;
} else if (comp == "lz4hc") { } else if (comp == "lz4hc") {
opt.compression = kLZ4HCCompression; opt.compression = kLZ4HCCompression;
} else if (comp == "xpress") {
opt.compression = kXpressCompression;
} else if (comp == "zstd") { } else if (comp == "zstd") {
opt.compression = kZSTDNotFinalCompression; opt.compression = kZSTDNotFinalCompression;
} else { } else {

@ -180,35 +180,27 @@ int SstFileReader::ShowAllCompressionSizes(size_t block_size) {
std::vector<std::unique_ptr<IntTblPropCollectorFactory> > std::vector<std::unique_ptr<IntTblPropCollectorFactory> >
block_based_table_factories; block_based_table_factories;
std::map<CompressionType, const char*> compress_type;
compress_type.insert(
std::make_pair(CompressionType::kNoCompression, "kNoCompression"));
compress_type.insert(std::make_pair(CompressionType::kSnappyCompression,
"kSnappyCompression"));
compress_type.insert(
std::make_pair(CompressionType::kZlibCompression, "kZlibCompression"));
compress_type.insert(
std::make_pair(CompressionType::kBZip2Compression, "kBZip2Compression"));
compress_type.insert(
std::make_pair(CompressionType::kLZ4Compression, "kLZ4Compression"));
compress_type.insert(
std::make_pair(CompressionType::kLZ4HCCompression, "kLZ4HCCompression"));
compress_type.insert(std::make_pair(CompressionType::kZSTDNotFinalCompression,
"kZSTDNotFinalCompression"));
fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size); fprintf(stdout, "Block Size: %" ROCKSDB_PRIszt "\n", block_size);
for (CompressionType i = CompressionType::kNoCompression; std::pair<CompressionType,const char*> compressions[] = {
i <= CompressionType::kZSTDNotFinalCompression; { CompressionType::kNoCompression, "kNoCompression" },
i = (i == kLZ4HCCompression) ? kZSTDNotFinalCompression { CompressionType::kSnappyCompression, "kSnappyCompression" },
: CompressionType(i + 1)) { { CompressionType::kZlibCompression, "kZlibCompression" },
{ CompressionType::kBZip2Compression, "kBZip2Compression" },
{ CompressionType::kLZ4Compression, "kLZ4Compression" },
{ CompressionType::kLZ4HCCompression, "kLZ4HCCompression" },
{ CompressionType::kXpressCompression, "kXpressCompression" },
{ CompressionType::kZSTDNotFinalCompression, "kZSTDNotFinalCompression" }
};
for (auto& i : compressions) {
CompressionOptions compress_opt; CompressionOptions compress_opt;
std::string column_family_name; std::string column_family_name;
TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i, TableBuilderOptions tb_opts(imoptions, ikc, &block_based_table_factories, i.first,
compress_opt, false /* skip_filters */, compress_opt, false /* skip_filters */,
column_family_name); column_family_name);
uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size); uint64_t file_size = CalculateCompressedTableSize(tb_opts, block_size);
fprintf(stdout, "Compression: %s", compress_type.find(i)->second); fprintf(stdout, "Compression: %s", i.second);
fprintf(stdout, " Size: %" PRIu64 "\n", file_size); fprintf(stdout, " Size: %" PRIu64 "\n", file_size);
} }
return 0; return 0;

@ -37,6 +37,10 @@
#include <zstd.h> #include <zstd.h>
#endif #endif
#if defined(XPRESS)
#include "port/xpress.h"
#endif
namespace rocksdb { namespace rocksdb {
inline bool Snappy_Supported() { inline bool Snappy_Supported() {
@ -67,6 +71,13 @@ inline bool LZ4_Supported() {
return false; return false;
} }
inline bool XPRESS_Supported() {
#ifdef XPRESS
return true;
#endif
return false;
}
inline bool ZSTD_Supported() { inline bool ZSTD_Supported() {
#ifdef ZSTD #ifdef ZSTD
return true; return true;
@ -88,6 +99,8 @@ inline bool CompressionTypeSupported(CompressionType compression_type) {
return LZ4_Supported(); return LZ4_Supported();
case kLZ4HCCompression: case kLZ4HCCompression:
return LZ4_Supported(); return LZ4_Supported();
case kXpressCompression:
return XPRESS_Supported();
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
return ZSTD_Supported(); return ZSTD_Supported();
default: default:
@ -110,6 +123,8 @@ inline std::string CompressionTypeToString(CompressionType compression_type) {
return "LZ4"; return "LZ4";
case kLZ4HCCompression: case kLZ4HCCompression:
return "LZ4HC"; return "LZ4HC";
case kXpressCompression:
return "Xpress";
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
return "ZSTD"; return "ZSTD";
default: default:
@ -606,6 +621,22 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts,
return false; return false;
} }
inline bool XPRESS_Compress(const char* input, size_t length, std::string* output) {
#ifdef XPRESS
return port::xpress::Compress(input, length, output);
#endif
return false;
}
inline char* XPRESS_Uncompress(const char* input_data, size_t input_length,
int* decompress_size) {
#ifdef XPRESS
return port::xpress::Decompress(input_data, input_length, decompress_size);
#endif
return nullptr;
}
inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef ZSTD #ifdef ZSTD

@ -572,6 +572,7 @@ static std::unordered_map<std::string, CompressionType>
{"kBZip2Compression", kBZip2Compression}, {"kBZip2Compression", kBZip2Compression},
{"kLZ4Compression", kLZ4Compression}, {"kLZ4Compression", kLZ4Compression},
{"kLZ4HCCompression", kLZ4HCCompression}, {"kLZ4HCCompression", kLZ4HCCompression},
{"kXpressCompression", kXpressCompression },
{"kZSTDNotFinalCompression", kZSTDNotFinalCompression}}; {"kZSTDNotFinalCompression", kZSTDNotFinalCompression}};
static std::unordered_map<std::string, BlockBasedTableOptions::IndexType> static std::unordered_map<std::string, BlockBasedTableOptions::IndexType>

@ -101,9 +101,10 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
"kBZip2Compression:" "kBZip2Compression:"
"kLZ4Compression:" "kLZ4Compression:"
"kLZ4HCCompression:" "kLZ4HCCompression:"
"kXpressCompression:"
"kZSTDNotFinalCompression"}, "kZSTDNotFinalCompression"},
{"compression_opts", "4:5:6"}, {"compression_opts", "4:5:6"},
{"num_levels", "7"}, {"num_levels", "8"},
{"level0_file_num_compaction_trigger", "8"}, {"level0_file_num_compaction_trigger", "8"},
{"level0_slowdown_writes_trigger", "9"}, {"level0_slowdown_writes_trigger", "9"},
{"level0_stop_writes_trigger", "10"}, {"level0_stop_writes_trigger", "10"},
@ -188,18 +189,19 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3); ASSERT_EQ(new_cf_opt.min_write_buffer_number_to_merge, 3);
ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99); ASSERT_EQ(new_cf_opt.max_write_buffer_number_to_maintain, 99);
ASSERT_EQ(new_cf_opt.compression, kSnappyCompression); ASSERT_EQ(new_cf_opt.compression, kSnappyCompression);
ASSERT_EQ(new_cf_opt.compression_per_level.size(), 7U); ASSERT_EQ(new_cf_opt.compression_per_level.size(), 8U);
ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression); ASSERT_EQ(new_cf_opt.compression_per_level[0], kNoCompression);
ASSERT_EQ(new_cf_opt.compression_per_level[1], kSnappyCompression); ASSERT_EQ(new_cf_opt.compression_per_level[1], kSnappyCompression);
ASSERT_EQ(new_cf_opt.compression_per_level[2], kZlibCompression); ASSERT_EQ(new_cf_opt.compression_per_level[2], kZlibCompression);
ASSERT_EQ(new_cf_opt.compression_per_level[3], kBZip2Compression); ASSERT_EQ(new_cf_opt.compression_per_level[3], kBZip2Compression);
ASSERT_EQ(new_cf_opt.compression_per_level[4], kLZ4Compression); ASSERT_EQ(new_cf_opt.compression_per_level[4], kLZ4Compression);
ASSERT_EQ(new_cf_opt.compression_per_level[5], kLZ4HCCompression); ASSERT_EQ(new_cf_opt.compression_per_level[5], kLZ4HCCompression);
ASSERT_EQ(new_cf_opt.compression_per_level[6], kZSTDNotFinalCompression); ASSERT_EQ(new_cf_opt.compression_per_level[6], kXpressCompression);
ASSERT_EQ(new_cf_opt.compression_per_level[7], kZSTDNotFinalCompression);
ASSERT_EQ(new_cf_opt.compression_opts.window_bits, 4); ASSERT_EQ(new_cf_opt.compression_opts.window_bits, 4);
ASSERT_EQ(new_cf_opt.compression_opts.level, 5); ASSERT_EQ(new_cf_opt.compression_opts.level, 5);
ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6); ASSERT_EQ(new_cf_opt.compression_opts.strategy, 6);
ASSERT_EQ(new_cf_opt.num_levels, 7); ASSERT_EQ(new_cf_opt.num_levels, 8);
ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8); ASSERT_EQ(new_cf_opt.level0_file_num_compaction_trigger, 8);
ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9); ASSERT_EQ(new_cf_opt.level0_slowdown_writes_trigger, 9);
ASSERT_EQ(new_cf_opt.level0_stop_writes_trigger, 10); ASSERT_EQ(new_cf_opt.level0_stop_writes_trigger, 10);

Loading…
Cancel
Save