Provide a way to override windows memory allocator with jemalloc for ZSTD

Summary:
Windows does not have LD_PRELOAD mechanism to override all memory allocation functions and ZSTD makes use of C-tuntime calloc. During flushes and compactions default system allocator fragments and the system slows down considerably.

For builds with jemalloc we employ an advanced ZSTD context creation API that re-directs memory allocation to jemalloc. To reduce the cost of context creation on each block we cache ZSTD context within the block based table builder while a new SST file is being built, this will help all platform builds including those w/o jemalloc. This avoids system allocator fragmentation and improves the performance.

The change does not address random reads and currently on Windows reads with ZSTD regress as compared with SNAPPY compression.
Closes https://github.com/facebook/rocksdb/pull/3838

Differential Revision: D8229794

Pulled By: miasantreble

fbshipit-source-id: 719b622ab7bf4109819bc44f45ec66f0dd3ee80d
main
Dmitri Smirnov 7 years ago committed by Facebook Github Bot
parent 4f297ad05f
commit f4b72d7056
  1. 13
      CMakeLists.txt
  2. 1
      TARGETS
  3. 12
      port/win/env_default.cc
  4. 19
      port/win/win_jemalloc.cc
  5. 1
      src.mk
  6. 72
      table/block_based_table_builder.cc
  7. 12
      table/block_based_table_builder.h
  8. 13
      table/block_based_table_reader.cc
  9. 6
      table/block_fetcher.cc
  10. 35
      table/format.cc
  11. 9
      table/format.h
  12. 92
      thirdparty.inc
  13. 45
      tools/db_bench_tool.cc
  14. 317
      util/compression.h
  15. 111
      util/compression_context_cache.cc
  16. 45
      util/compression_context_cache.h
  17. 3
      util/thread_local.h
  18. 10
      utilities/blob_db/blob_db_impl.cc
  19. 4
      utilities/blob_db/blob_dump_tool.cc
  20. 20
      utilities/column_aware_encoding_util.cc

@ -322,15 +322,25 @@ if(DEFINED USE_RTTI)
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI")
else() else()
message(STATUS "Disabling RTTI") if(MSVC)
message(STATUS "Disabling RTTI in Release builds. Always on in Debug.")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-")
else()
message(STATUS "Disabling RTTI in Release builds")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif() endif()
endif()
else() else()
message(STATUS "Enabling RTTI in Debug builds only (default)") message(STATUS "Enabling RTTI in Debug builds only (default)")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI")
if(MSVC)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-")
else()
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti")
endif() endif()
endif()
if(MSVC) if(MSVC)
if((${OPTIMIZE_DEBUG} EQUAL 1)) if((${OPTIMIZE_DEBUG} EQUAL 1))
@ -574,6 +584,7 @@ set(SOURCES
util/coding.cc util/coding.cc
util/compaction_job_stats_impl.cc util/compaction_job_stats_impl.cc
util/comparator.cc util/comparator.cc
util/compression_context_cache.cc
util/concurrent_arena.cc util/concurrent_arena.cc
util/crc32c.cc util/crc32c.cc
util/delete_scheduler.cc util/delete_scheduler.cc

@ -200,6 +200,7 @@ cpp_library(
"util/coding.cc", "util/coding.cc",
"util/compaction_job_stats_impl.cc", "util/compaction_job_stats_impl.cc",
"util/comparator.cc", "util/comparator.cc",
"util/compression_context_cache.cc",
"util/concurrent_arena.cc", "util/concurrent_arena.cc",
"util/crc32c.cc", "util/crc32c.cc",
"util/delete_scheduler.cc", "util/delete_scheduler.cc",

@ -11,16 +11,13 @@
#include <rocksdb/env.h> #include <rocksdb/env.h>
#include "port/win/env_win.h" #include "port/win/env_win.h"
#include "util/compression_context_cache.h"
#include "util/thread_local.h"
namespace rocksdb { namespace rocksdb {
namespace port { namespace port {
// We choose to create this on the heap and using std::once for the following // We choose not to destroy the env because joining the threads from the
// reasons
// 1) Currently available MS compiler does not implement atomic C++11
// initialization of
// function local statics
// 2) We choose not to destroy the env because joining the threads from the
// system loader // system loader
// which destroys the statics (same as from DLLMain) creates a system loader // which destroys the statics (same as from DLLMain) creates a system loader
// dead-lock. // dead-lock.
@ -29,11 +26,12 @@ namespace {
std::once_flag winenv_once_flag; std::once_flag winenv_once_flag;
Env* envptr; Env* envptr;
}; };
} }
Env* Env::Default() { Env* Env::Default() {
using namespace port; using namespace port;
ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); });
return envptr; return envptr;
} }

@ -14,6 +14,25 @@
#include <stdexcept> #include <stdexcept>
#include "jemalloc/jemalloc.h" #include "jemalloc/jemalloc.h"
#if defined(ZSTD) && defined(ZSTD_STATIC_LINKING_ONLY)
#include <zstd.h>
#if (ZSTD_VERSION_NUMBER >= 500)
namespace rocksdb {
namespace port {
void* JemallocAllocateForZSTD(void* /* opaque */, size_t size) {
return je_malloc(size);
}
void JemallocDeallocateForZSTD(void* /* opaque */, void* address) {
je_free(address);
}
ZSTD_customMem GetJeZstdAllocationOverrides() {
return { JemallocAllocateForZSTD, JemallocDeallocateForZSTD, nullptr };
}
} // namespace port
} // namespace rocksdb
#endif // (ZSTD_VERSION_NUMBER >= 500)
#endif // defined(ZSTD) defined(ZSTD_STATIC_LINKING_ONLY)
// Global operators to be replaced by a linker when this file is // Global operators to be replaced by a linker when this file is
// a part of the build // a part of the build

@ -129,6 +129,7 @@ LIB_SOURCES = \
util/coding.cc \ util/coding.cc \
util/compaction_job_stats_impl.cc \ util/compaction_job_stats_impl.cc \
util/comparator.cc \ util/comparator.cc \
util/compression_context_cache.cc \
util/concurrent_arena.cc \ util/concurrent_arena.cc \
util/crc32c.cc \ util/crc32c.cc \
util/delete_scheduler.cc \ util/delete_scheduler.cc \

@ -104,19 +104,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options, const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version, CompressionType* type, uint32_t format_version,
const Slice& compression_dict,
std::string* compressed_output) { std::string* compressed_output) {
if (*type == kNoCompression) { *type = compression_ctx.type();
if (compression_ctx.type() == kNoCompression) {
return raw; return raw;
} }
// Will return compressed block contents if (1) the compression method is // Will return compressed block contents if (1) the compression method is
// supported in this platform and (2) the compression rate is "good enough". // supported in this platform and (2) the compression rate is "good enough".
switch (*type) { switch (compression_ctx.type()) {
case kSnappyCompression: case kSnappyCompression:
if (Snappy_Compress(compression_options, raw.data(), raw.size(), if (Snappy_Compress(compression_ctx, raw.data(), raw.size(),
compressed_output) && compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
@ -124,16 +124,16 @@ Slice CompressBlock(const Slice& raw,
break; // fall back to no compression. break; // fall back to no compression.
case kZlibCompression: case kZlibCompression:
if (Zlib_Compress( if (Zlib_Compress(
compression_options, compression_ctx,
GetCompressFormatForVersion(kZlibCompression, format_version), GetCompressFormatForVersion(kZlibCompression, format_version),
raw.data(), raw.size(), compressed_output, compression_dict) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
break; // fall back to no compression. break; // fall back to no compression.
case kBZip2Compression: case kBZip2Compression:
if (BZip2_Compress( if (BZip2_Compress(
compression_options, compression_ctx,
GetCompressFormatForVersion(kBZip2Compression, format_version), GetCompressFormatForVersion(kBZip2Compression, format_version),
raw.data(), raw.size(), compressed_output) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
@ -142,18 +142,18 @@ Slice CompressBlock(const Slice& raw,
break; // fall back to no compression. break; // fall back to no compression.
case kLZ4Compression: case kLZ4Compression:
if (LZ4_Compress( if (LZ4_Compress(
compression_options, compression_ctx,
GetCompressFormatForVersion(kLZ4Compression, format_version), GetCompressFormatForVersion(kLZ4Compression, format_version),
raw.data(), raw.size(), compressed_output, compression_dict) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
break; // fall back to no compression. break; // fall back to no compression.
case kLZ4HCCompression: case kLZ4HCCompression:
if (LZ4HC_Compress( if (LZ4HC_Compress(
compression_options, compression_ctx,
GetCompressFormatForVersion(kLZ4HCCompression, format_version), GetCompressFormatForVersion(kLZ4HCCompression, format_version),
raw.data(), raw.size(), compressed_output, compression_dict) && raw.data(), raw.size(), compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
@ -167,8 +167,8 @@ Slice CompressBlock(const Slice& raw,
break; break;
case kZSTD: case kZSTD:
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
if (ZSTD_Compress(compression_options, raw.data(), raw.size(), if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(),
compressed_output, compression_dict) && compressed_output) &&
GoodCompressionRatio(compressed_output->size(), raw.size())) { GoodCompressionRatio(compressed_output->size(), raw.size())) {
return *compressed_output; return *compressed_output;
} }
@ -261,10 +261,10 @@ struct BlockBasedTableBuilder::Rep {
PartitionedIndexBuilder* p_index_builder_ = nullptr; PartitionedIndexBuilder* p_index_builder_ = nullptr;
std::string last_key; std::string last_key;
const CompressionType compression_type; // Compression dictionary or nullptr
const CompressionOptions compression_opts;
// Data for presetting the compression library's dictionary, or nullptr.
const std::string* compression_dict; const std::string* compression_dict;
CompressionContext compression_ctx;
std::unique_ptr<UncompressionContext> verify_ctx;
TableProperties props; TableProperties props;
bool closed = false; // Either Finish() or Abandon() has been called. bool closed = false; // Either Finish() or Abandon() has been called.
@ -306,9 +306,8 @@ struct BlockBasedTableBuilder::Rep {
table_options.use_delta_encoding), table_options.use_delta_encoding),
range_del_block(1 /* block_restart_interval */), range_del_block(1 /* block_restart_interval */),
internal_prefix_transform(_moptions.prefix_extractor.get()), internal_prefix_transform(_moptions.prefix_extractor.get()),
compression_type(_compression_type),
compression_opts(_compression_opts),
compression_dict(_compression_dict), compression_dict(_compression_dict),
compression_ctx(_compression_type, _compression_opts),
compressed_cache_key_prefix_size(0), compressed_cache_key_prefix_size(0),
flush_block_policy( flush_block_policy(
table_options.flush_block_policy_factory->NewFlushBlockPolicy( table_options.flush_block_policy_factory->NewFlushBlockPolicy(
@ -342,6 +341,16 @@ struct BlockBasedTableBuilder::Rep {
new BlockBasedTablePropertiesCollector( new BlockBasedTablePropertiesCollector(
table_options.index_type, table_options.whole_key_filtering, table_options.index_type, table_options.whole_key_filtering,
_moptions.prefix_extractor != nullptr)); _moptions.prefix_extractor != nullptr));
if (table_options.verify_compression) {
verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(),
compression_ctx.type()));
}
}
Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;
~Rep() {
} }
}; };
@ -480,7 +489,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
assert(ok()); assert(ok());
Rep* r = rep_; Rep* r = rep_;
auto type = r->compression_type; auto type = r->compression_ctx.type();
Slice block_contents; Slice block_contents;
bool abort_compression = false; bool abort_compression = false;
@ -490,12 +499,23 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (raw_block_contents.size() < kCompressionSizeLimit) { if (raw_block_contents.size() < kCompressionSizeLimit) {
Slice compression_dict; Slice compression_dict;
if (is_data_block && r->compression_dict && r->compression_dict->size()) { if (is_data_block && r->compression_dict && r->compression_dict->size()) {
compression_dict = *r->compression_dict; r->compression_ctx.dict() = *r->compression_dict;
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = *r->compression_dict;
}
} else {
// Clear dictionary
r->compression_ctx.dict() = Slice();
if (r->table_options.verify_compression) {
assert(r->verify_ctx != nullptr);
r->verify_ctx->dict() = Slice();
}
} }
block_contents = CompressBlock(raw_block_contents, r->compression_opts, block_contents = CompressBlock(raw_block_contents, r->compression_ctx,
&type, r->table_options.format_version, &type, r->table_options.format_version,
compression_dict, &r->compressed_output); &r->compressed_output);
// Some of the compression algorithms are known to be unreliable. If // Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the // the verify_compression flag is set then try to de-compress the
@ -503,9 +523,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (type != kNoCompression && r->table_options.verify_compression) { if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType( Status stat = UncompressBlockContentsForCompressionType(*r->verify_ctx,
block_contents.data(), block_contents.size(), &contents, block_contents.data(), block_contents.size(), &contents,
r->table_options.format_version, compression_dict, type, r->table_options.format_version,
r->ioptions); r->ioptions);
if (stat.ok()) { if (stat.ok()) {
@ -739,7 +759,7 @@ Status BlockBasedTableBuilder::Finish() {
r->props.merge_operator_name = r->ioptions.merge_operator != nullptr r->props.merge_operator_name = r->ioptions.merge_operator != nullptr
? r->ioptions.merge_operator->Name() ? r->ioptions.merge_operator->Name()
: "nullptr"; : "nullptr";
r->props.compression_name = CompressionTypeToString(r->compression_type); r->props.compression_name = CompressionTypeToString(r->compression_ctx.type());
r->props.prefix_extractor_name = r->props.prefix_extractor_name =
r->moptions.prefix_extractor != nullptr r->moptions.prefix_extractor != nullptr
? r->moptions.prefix_extractor->Name() ? r->moptions.prefix_extractor->Name()

@ -19,6 +19,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "table/table_builder.h" #include "table/table_builder.h"
#include "util/compression.h"
namespace rocksdb { namespace rocksdb {
@ -53,6 +54,10 @@ class BlockBasedTableBuilder : public TableBuilder {
// REQUIRES: Either Finish() or Abandon() has been called. // REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder(); ~BlockBasedTableBuilder();
// No copying allowed
BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete;
BlockBasedTableBuilder& operator=(const BlockBasedTableBuilder&) = delete;
// Add key,value to the table being constructed. // Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator. // REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called // REQUIRES: Finish(), Abandon() have not been called
@ -115,16 +120,11 @@ class BlockBasedTableBuilder : public TableBuilder {
// Some compression libraries fail when the raw size is bigger than int. If // Some compression libraries fail when the raw size is bigger than int. If
// uncompressed size is bigger than kCompressionSizeLimit, don't compress it // uncompressed size is bigger than kCompressionSizeLimit, don't compress it
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max(); const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
// No copying allowed
BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete;
void operator=(const BlockBasedTableBuilder&) = delete;
}; };
Slice CompressBlock(const Slice& raw, Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options, const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version, CompressionType* type, uint32_t format_version,
const Slice& compression_dict,
std::string* compressed_output); std::string* compressed_output);
} // namespace rocksdb } // namespace rocksdb

@ -1104,9 +1104,12 @@ Status BlockBasedTable::GetDataBlockFromCache(
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
s = UncompressBlockContents(compressed_block->data(), UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
compression_dict);
s = UncompressBlockContents(uncompresssion_ctx,
compressed_block->data(),
compressed_block->size(), &contents, compressed_block->size(), &contents,
format_version, compression_dict, format_version,
ioptions); ioptions);
// Insert uncompressed block into block cache // Insert uncompressed block into block cache
@ -1182,8 +1185,10 @@ Status BlockBasedTable::PutDataBlockToCache(
BlockContents contents; BlockContents contents;
Statistics* statistics = ioptions.statistics; Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents, UncompressionContext uncompression_ctx(raw_block->compression_type(), compression_dict);
format_version, compression_dict, ioptions); s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents,
format_version, ioptions);
} }
if (!s.ok()) { if (!s.ok()) {
delete raw_block; delete raw_block;

@ -225,8 +225,10 @@ Status BlockFetcher::ReadBlockContents() {
if (do_uncompress_ && compression_type != kNoCompression) { if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache // compressed page, uncompress, update cache
status_ = UncompressBlockContents(slice_.data(), block_size_, contents_, UncompressionContext uncompression_ctx(compression_type, compression_dict_);
footer_.version(), compression_dict_, status_ = UncompressBlockContents(uncompression_ctx,
slice_.data(), block_size_, contents_,
footer_.version(),
ioptions_); ioptions_);
} else { } else {
GetBlockContents(); GetBlockContents();

@ -264,18 +264,17 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
return Status::OK(); return Status::OK();
} }
Status UncompressBlockContentsForCompressionType( Status UncompressBlockContentsForCompressionType(const UncompressionContext& uncompression_ctx,
const char* data, size_t n, BlockContents* contents, const char* data, size_t n, BlockContents* contents,
uint32_t format_version, const Slice& compression_dict, uint32_t format_version, const ImmutableCFOptions &ioptions) {
CompressionType compression_type, const ImmutableCFOptions &ioptions) {
std::unique_ptr<char[]> ubuf; std::unique_ptr<char[]> ubuf;
assert(compression_type != kNoCompression && "Invalid compression type"); assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type");
StopWatchNano timer(ioptions.env, StopWatchNano timer(ioptions.env,
ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
int decompress_size = 0; int decompress_size = 0;
switch (compression_type) { switch (uncompression_ctx.type()) {
case kSnappyCompression: { case kSnappyCompression: {
size_t ulength = 0; size_t ulength = 0;
static char snappy_corrupt_msg[] = static char snappy_corrupt_msg[] =
@ -291,10 +290,9 @@ Status UncompressBlockContentsForCompressionType(
break; break;
} }
case kZlibCompression: case kZlibCompression:
ubuf.reset(Zlib_Uncompress( ubuf.reset(Zlib_Uncompress(uncompression_ctx,
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version), GetCompressFormatForVersion(kZlibCompression, format_version)));
compression_dict));
if (!ubuf) { if (!ubuf) {
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
"Zlib not supported or corrupted Zlib compressed block contents"; "Zlib not supported or corrupted Zlib compressed block contents";
@ -316,10 +314,9 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf.reset(LZ4_Uncompress( ubuf.reset(LZ4_Uncompress(uncompression_ctx,
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version), GetCompressFormatForVersion(kLZ4Compression, format_version)));
compression_dict));
if (!ubuf) { if (!ubuf) {
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
"LZ4 not supported or corrupted LZ4 compressed block contents"; "LZ4 not supported or corrupted LZ4 compressed block contents";
@ -329,10 +326,9 @@ Status UncompressBlockContentsForCompressionType(
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf.reset(LZ4_Uncompress( ubuf.reset(LZ4_Uncompress(uncompression_ctx,
data, n, &decompress_size, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version), GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
compression_dict));
if (!ubuf) { if (!ubuf) {
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
"LZ4HC not supported or corrupted LZ4HC compressed block contents"; "LZ4HC not supported or corrupted LZ4HC compressed block contents";
@ -353,7 +349,7 @@ Status UncompressBlockContentsForCompressionType(
break; break;
case kZSTD: case kZSTD:
case kZSTDNotFinalCompression: case kZSTDNotFinalCompression:
ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size, compression_dict)); ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size));
if (!ubuf) { if (!ubuf) {
static char zstd_corrupt_msg[] = static char zstd_corrupt_msg[] =
"ZSTD not supported or corrupted ZSTD compressed block contents"; "ZSTD not supported or corrupted ZSTD compressed block contents";
@ -383,14 +379,15 @@ Status UncompressBlockContentsForCompressionType(
// buffer is returned via 'result' and it is upto the caller to // buffer is returned via 'result' and it is upto the caller to
// free this buffer. // free this buffer.
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
Status UncompressBlockContents(const char* data, size_t n, Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n,
BlockContents* contents, uint32_t format_version, BlockContents* contents, uint32_t format_version,
const Slice& compression_dict,
const ImmutableCFOptions &ioptions) { const ImmutableCFOptions &ioptions) {
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type());
return UncompressBlockContentsForCompressionType( return UncompressBlockContentsForCompressionType(
data, n, contents, format_version, compression_dict, uncompression_ctx, data, n, contents,
(CompressionType)data[n], ioptions); format_version, ioptions);
} }
} // namespace rocksdb } // namespace rocksdb

@ -228,19 +228,20 @@ extern Status ReadBlockContents(
// free this buffer. // free this buffer.
// For description of compress_format_version and possible values, see // For description of compress_format_version and possible values, see
// util/compression.h // util/compression.h
extern Status UncompressBlockContents(const char* data, size_t n, extern Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
const char* data, size_t n,
BlockContents* contents, BlockContents* contents,
uint32_t compress_format_version, uint32_t compress_format_version,
const Slice& compression_dict,
const ImmutableCFOptions &ioptions); const ImmutableCFOptions &ioptions);
// This is an extension to UncompressBlockContents that accepts // This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks // a specific compression type. This is used by un-wrapped blocks
// with no compression header. // with no compression header.
extern Status UncompressBlockContentsForCompressionType( extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx,
const char* data, size_t n, BlockContents* contents, const char* data, size_t n, BlockContents* contents,
uint32_t compress_format_version, const Slice& compression_dict, uint32_t compress_format_version,
CompressionType compression_type, const ImmutableCFOptions& ioptions); const ImmutableCFOptions& ioptions);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,

@ -12,9 +12,9 @@ set (THIRDPARTY_LIBS "") # Initialization, don't touch
# Defaults # Defaults
# #
set(GFLAGS_HOME $ENV{THIRDPARTY_HOME}/Gflags.Library) set(GFLAGS_HOME $ENV{THIRDPARTY_HOME}/Gflags.Library)
set(GFLAGS_INCLUDE ${GFLAGS_HOME}/inc/include) set(GFLAGS_INCLUDE ${GFLAGS_HOME}/build/native/include)
set(GFLAGS_LIB_DEBUG ${GFLAGS_HOME}/bin/debug/amd64/gflags.lib) set(GFLAGS_LIB_DEBUG ${GFLAGS_HOME}/lib/native/debug/amd64/gflags.lib)
set(GFLAGS_LIB_RELEASE ${GFLAGS_HOME}/bin/retail/amd64/gflags.lib) set(GFLAGS_LIB_RELEASE ${GFLAGS_HOME}/lib/native/retail/amd64/gflags.lib)
# ================================================== GFLAGS ================================================== # ================================================== GFLAGS ==================================================
# For compatibility # For compatibility
@ -52,9 +52,9 @@ endif ()
# Edit these 4 lines to define paths to Snappy # Edit these 4 lines to define paths to Snappy
# #
set(SNAPPY_HOME $ENV{THIRDPARTY_HOME}/Snappy.Library) set(SNAPPY_HOME $ENV{THIRDPARTY_HOME}/Snappy.Library)
set(SNAPPY_INCLUDE ${SNAPPY_HOME}/inc/inc) set(SNAPPY_INCLUDE ${SNAPPY_HOME}/build/native/inc/inc)
set(SNAPPY_LIB_DEBUG ${SNAPPY_HOME}/bin/debug/amd64/snappy.lib) set(SNAPPY_LIB_DEBUG ${SNAPPY_HOME}/lib/native/debug/amd64/snappy.lib)
set(SNAPPY_LIB_RELEASE ${SNAPPY_HOME}/bin/retail/amd64/snappy.lib) set(SNAPPY_LIB_RELEASE ${SNAPPY_HOME}/lib/native/retail/amd64/snappy.lib)
# For compatibility # For compatibility
if(SNAPPY) if(SNAPPY)
@ -91,14 +91,13 @@ endif ()
# Edit these 4 lines to define paths to LZ4 # Edit these 4 lines to define paths to LZ4
# #
set(LZ4_HOME $ENV{THIRDPARTY_HOME}/LZ4.Library) set(LZ4_HOME $ENV{THIRDPARTY_HOME}/LZ4.Library)
set(LZ4_INCLUDE ${LZ4_HOME}/inc/include) set(LZ4_INCLUDE ${LZ4_HOME}/build/native/inc/inc)
set(LZ4_LIB_DEBUG ${LZ4_HOME}/bin/debug/amd64/lz4.lib) set(LZ4_LIB_DEBUG ${LZ4_HOME}/lib/native/debug/amd64/lz4.lib)
set(LZ4_LIB_RELEASE ${LZ4_HOME}/bin/retail/amd64/lz4.lib) set(LZ4_LIB_RELEASE ${LZ4_HOME}/lib/native/retail/amd64/lz4.lib)
#
# Don't touch these lines # For compatibility
# if (LZ4)
if (DEFINED LZ4)
set(WITH_LZ4 ON) set(WITH_LZ4 ON)
endif () endif ()
@ -132,13 +131,9 @@ endif ()
# Edit these 4 lines to define paths to ZLIB # Edit these 4 lines to define paths to ZLIB
# #
set(ZLIB_HOME $ENV{THIRDPARTY_HOME}/ZLIB.Library) set(ZLIB_HOME $ENV{THIRDPARTY_HOME}/ZLIB.Library)
set(ZLIB_INCLUDE ${ZLIB_HOME}/inc/include) set(ZLIB_INCLUDE ${ZLIB_HOME}/build/native/inc/inc)
set(ZLIB_LIB_DEBUG ${ZLIB_HOME}/bin/debug/amd64/zlib.lib) set(ZLIB_LIB_DEBUG ${ZLIB_HOME}/lib/native/debug/amd64/zlib.lib)
set(ZLIB_LIB_RELEASE ${ZLIB_HOME}/bin/retail/amd64/zlib.lib) set(ZLIB_LIB_RELEASE ${ZLIB_HOME}/lib/native/retail/amd64/zlib.lib)
#
# Don't touch these lines
#
# For compatibilty # For compatibilty
if (ZLIB) if (ZLIB)
@ -170,6 +165,9 @@ else ()
message(STATUS "ZLIB library is disabled") message(STATUS "ZLIB library is disabled")
endif () endif ()
# ================================================== XPRESS ==================================================
# This makes use of built-in Windows API, no additional includes, links to a system lib
# For compatibilty # For compatibilty
if (XPRESS) if (XPRESS)
set(WITH_XPRESS ON) set(WITH_XPRESS ON)
@ -186,20 +184,56 @@ else ()
message(STATUS "XPRESS is disabled") message(STATUS "XPRESS is disabled")
endif () endif ()
# ================================================== ZSTD ==================================================
# #
# Edit these 4 lines to define paths to Jemalloc # Edit these 4 lines to define paths to ZSTD
# #
set(JEMALLOC_HOME $ENV{THIRDPARTY_HOME}/Jemalloc.Library) set(ZSTD_HOME $ENV{THIRDPARTY_HOME}/ZSTD.Library)
set(JEMALLOC_INCLUDE ${JEMALLOC_HOME}/inc/include) set(ZSTD_INCLUDE ${ZSTD_HOME}/build/native/inc)
set(JEMALLOC_LIB_DEBUG ${JEMALLOC_HOME}/bin/debug/amd64/jemalloc.lib) set(ZSTD_LIB_DEBUG ${ZSTD_HOME}/lib/native/debug/amd64/libzstd_static.lib)
set(JEMALLOC_LIB_RELEASE ${JEMALLOC_HOME}/bin/retail/amd64/jemalloc.lib) set(ZSTD_LIB_RELEASE ${ZSTD_HOME}/lib/native/retail/amd64/libzstd_static.lib)
# For compatibility
if (ZSTD)
set(WITH_ZSTD ON)
endif ()
if (WITH_ZSTD)
message(STATUS "ZSTD library is enabled")
if(DEFINED ENV{ZSTD_INCLUDE})
set(ZSTD_INCLUDE $ENV{ZSTD_INCLUDE})
endif()
if(DEFINED ENV{ZSTD_LIB_DEBUG})
set(ZSTD_LIB_DEBUG $ENV{ZSTD_LIB_DEBUG})
endif()
if(DEFINED ENV{ZSTD_LIB_RELEASE})
set(ZSTD_LIB_RELEASE $ENV{ZSTD_LIB_RELEASE})
endif()
# ZSTD_STATIC_LINKING_ONLY only allows us to create an allocation functions override
# When jemalloc is in use
set(ZSTD_LIBS debug ${ZSTD_LIB_DEBUG} optimized ${ZSTD_LIB_RELEASE})
add_definitions(-DZSTD -DZSTD_STATIC_LINKING_ONLY)
include_directories(${ZSTD_INCLUDE})
set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${ZSTD_LIBS})
else ()
message(STATUS "ZSTD library is disabled")
endif ()
# ================================================== JEMALLOC ==================================================
# #
# Don't touch these lines # Edit these 4 lines to define paths to Jemalloc
# #
set(JEMALLOC_HOME $ENV{THIRDPARTY_HOME}/Jemalloc.Library)
set(JEMALLOC_INCLUDE ${JEMALLOC_HOME}/build/native/inc)
set(JEMALLOC_LIB_DEBUG ${JEMALLOC_HOME}/lib/native/debug/amd64/jemalloc.lib)
set(JEMALLOC_LIB_RELEASE ${JEMALLOC_HOME}/lib/native/retail/amd64/jemalloc.lib)
# For compatibilty # ================================================== JEMALLOC ==================================================
if(JEMALLOC) if(JEMALLOC)
set(WITH_JEMALLOC ON) set(WITH_JEMALLOC ON)
endif() endif()
@ -227,8 +261,6 @@ if (WITH_JEMALLOC)
set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${JEMALLOC_LIBS}) set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${JEMALLOC_LIBS})
set (ARTIFACT_SUFFIX "_je") set (ARTIFACT_SUFFIX "_je")
set(WITH_JEMALLOC ON)
else () else ()
set (ARTIFACT_SUFFIX "") set (ARTIFACT_SUFFIX "")
message(STATUS "JEMALLOC library is disabled") message(STATUS "JEMALLOC library is disabled")

@ -1941,27 +1941,28 @@ class Benchmark {
return true; return true;
} }
inline bool CompressSlice(const Slice& input, std::string* compressed) { inline bool CompressSlice(const CompressionContext& compression_ctx,
const Slice& input, std::string* compressed) {
bool ok = true; bool ok = true;
switch (FLAGS_compression_type_e) { switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: case rocksdb::kSnappyCompression:
ok = Snappy_Compress(Options().compression_opts, input.data(), ok = Snappy_Compress(compression_ctx, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
ok = Zlib_Compress(Options().compression_opts, 2, input.data(), ok = Zlib_Compress(compression_ctx, 2, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
ok = BZip2_Compress(Options().compression_opts, 2, input.data(), ok = BZip2_Compress(compression_ctx, 2, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
ok = LZ4_Compress(Options().compression_opts, 2, input.data(), ok = LZ4_Compress(compression_ctx, 2, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(), ok = LZ4HC_Compress(compression_ctx, 2, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kXpressCompression: case rocksdb::kXpressCompression:
@ -1969,7 +1970,7 @@ class Benchmark {
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kZSTD: case rocksdb::kZSTD:
ok = ZSTD_Compress(Options().compression_opts, input.data(), ok = ZSTD_Compress(compression_ctx, input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
default: default:
@ -2052,7 +2053,8 @@ class Benchmark {
const int len = FLAGS_block_size; const int len = FLAGS_block_size;
std::string input_str(len, 'y'); std::string input_str(len, 'y');
std::string compressed; std::string compressed;
bool result = CompressSlice(Slice(input_str), &compressed); CompressionContext compression_ctx(FLAGS_compression_type_e, Options().compression_opts);
bool result = CompressSlice(compression_ctx, Slice(input_str), &compressed);
if (!result) { if (!result) {
fprintf(stdout, "WARNING: %s compression is not enabled\n", fprintf(stdout, "WARNING: %s compression is not enabled\n",
@ -2194,10 +2196,11 @@ class Benchmark {
merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys),
report_file_operations_(FLAGS_report_file_operations), report_file_operations_(FLAGS_report_file_operations),
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
use_blob_db_(FLAGS_use_blob_db) { use_blob_db_(FLAGS_use_blob_db)
#else #else
use_blob_db_(false) { use_blob_db_(false)
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
{
// use simcache instead of cache // use simcache instead of cache
if (FLAGS_simcache_size >= 0) { if (FLAGS_simcache_size >= 0) {
if (FLAGS_cache_numshardbits >= 1) { if (FLAGS_cache_numshardbits >= 1) {
@ -2843,11 +2846,13 @@ void VerifyDBFromDB(std::string& truth_db_name) {
int64_t produced = 0; int64_t produced = 0;
bool ok = true; bool ok = true;
std::string compressed; std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e,
Options().compression_opts);
// Compress 1G // Compress 1G
while (ok && bytes < int64_t(1) << 30) { while (ok && bytes < int64_t(1) << 30) {
compressed.clear(); compressed.clear();
ok = CompressSlice(input, &compressed); ok = CompressSlice(compression_ctx, input, &compressed);
produced += compressed.size(); produced += compressed.size();
bytes += input.size(); bytes += input.size();
thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress); thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress);
@ -2869,7 +2874,11 @@ void VerifyDBFromDB(std::string& truth_db_name) {
Slice input = gen.Generate(FLAGS_block_size); Slice input = gen.Generate(FLAGS_block_size);
std::string compressed; std::string compressed;
bool ok = CompressSlice(input, &compressed); UncompressionContext uncompression_ctx(FLAGS_compression_type_e);
CompressionContext compression_ctx(FLAGS_compression_type_e,
Options().compression_opts);
bool ok = CompressSlice(compression_ctx, input, &compressed);
int64_t bytes = 0; int64_t bytes = 0;
int decompress_size; int decompress_size;
while (ok && bytes < 1024 * 1048576) { while (ok && bytes < 1024 * 1048576) {
@ -2889,7 +2898,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
break; break;
} }
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(), uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(),
&decompress_size, 2); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
@ -2899,12 +2909,14 @@ void VerifyDBFromDB(std::string& truth_db_name) {
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(),
&decompress_size, 2); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(),
&decompress_size, 2); &decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
@ -2914,7 +2926,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kZSTD: case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(), uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(),
&decompress_size); &decompress_size);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;

@ -15,6 +15,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression_context_cache.h"
#ifdef SNAPPY #ifdef SNAPPY
#include <snappy.h> #include <snappy.h>
@ -38,14 +39,215 @@
#if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+ #if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+
#include <zdict.h> #include <zdict.h>
#endif // ZSTD_VERSION_NUMBER >= 800 #endif // ZSTD_VERSION_NUMBER >= 800
namespace rocksdb {
// Need this for the context allocation override
// On windows we need to do this explicitly
#if (ZSTD_VERSION_NUMBER >= 500)
#if defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \
defined(ZSTD_STATIC_LINKING_ONLY)
#define ROCKSDB_ZSTD_CUSTOM_MEM
namespace port {
ZSTD_customMem GetJeZstdAllocationOverrides();
} // namespace port
#endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) &&
// defined(ZSTD_STATIC_LINKING_ONLY)
// Cached data represents a portion that can be re-used
// If, in the future we have more than one native context to
// cache we can arrange this as a tuple
class ZSTDUncompressCachedData {
public:
using ZSTDNativeContext = ZSTD_DCtx*;
ZSTDUncompressCachedData() {}
// Init from cache
ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT :
ZSTDUncompressCachedData() {
*this = std::move(o);
}
ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT {
assert(zstd_ctx_ == nullptr);
std::swap(zstd_ctx_,o.zstd_ctx_);
std::swap(cache_idx_,o.cache_idx_);
return *this;
}
ZSTDNativeContext Get() const {
return zstd_ctx_;
}
int64_t GetCacheIndex() const {
return cache_idx_;
}
void CreateIfNeeded() {
if (zstd_ctx_ == nullptr) {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
#else // ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createDCtx();
#endif // ROCKSDB_ZSTD_CUSTOM_MEM
cache_idx_ = -1;
}
}
void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) {
zstd_ctx_ = o.zstd_ctx_;
cache_idx_ = idx;
}
~ZSTDUncompressCachedData() {
if (zstd_ctx_ != nullptr && cache_idx_ == -1) {
ZSTD_freeDCtx(zstd_ctx_);
}
}
private:
ZSTDNativeContext zstd_ctx_ = nullptr;
int64_t cache_idx_ = -1; // -1 means this instance owns the context
};
#endif // (ZSTD_VERSION_NUMBER >= 500)
} // namespace rocksdb
#endif // ZSTD #endif // ZSTD
#if !(defined ZSTD) || !(ZSTD_VERSION_NUMBER >= 500)
namespace rocksdb {
class ZSTDUncompressCachedData {
void* padding; // unused
public:
using ZSTDNativeContext = void*;
ZSTDUncompressCachedData() {}
ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {}
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default;
ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default;
ZSTDNativeContext Get() const {
return nullptr;
}
int64_t GetCacheIndex() const {
return -1;
}
void CreateIfNeeded() {}
void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {}
};
} // namespace rocksdb
#endif
#if defined(XPRESS) #if defined(XPRESS)
#include "port/xpress.h" #include "port/xpress.h"
#endif #endif
namespace rocksdb { namespace rocksdb {
// Instantiate this class and pass it to the uncompression API below
class CompressionContext {
private:
const CompressionType type_;
const CompressionOptions opts_;
Slice dict_;
#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500)
ZSTD_CCtx* zstd_ctx_ = nullptr;
void CreateNativeContext() {
if (type_ == kZSTD) {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
#else // ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createCCtx();
#endif // ROCKSDB_ZSTD_CUSTOM_MEM
}
}
void DestroyNativeContext() {
if (zstd_ctx_ != nullptr) {
ZSTD_freeCCtx(zstd_ctx_);
}
}
public:
// callable inside ZSTD_Compress
ZSTD_CCtx * ZSTDPreallocCtx() const {
assert(type_ == kZSTD);
return zstd_ctx_;
}
#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
private:
void CreateNativeContext() {}
void DestroyNativeContext() {}
#endif //ZSTD && (ZSTD_VERSION_NUMBER >= 500)
public:
explicit CompressionContext(CompressionType comp_type) :
type_(comp_type) {
CreateNativeContext();
}
CompressionContext(CompressionType comp_type,
const CompressionOptions& opts,
const Slice& comp_dict = Slice()) :
type_(comp_type),
opts_(opts),
dict_(comp_dict) {
CreateNativeContext();
}
~CompressionContext() {
DestroyNativeContext();
}
CompressionContext(const CompressionContext&) = delete;
CompressionContext& operator=(const CompressionContext&) = delete;
const CompressionOptions& options() const {
return opts_;
}
CompressionType type() const {
return type_;
}
const Slice& dict() const {
return dict_;
}
Slice& dict() {
return dict_;
}
};
// Instantiate this class and pass it to the uncompression API below
class UncompressionContext {
private:
CompressionType type_;
Slice dict_;
CompressionContextCache* ctx_cache_ = nullptr;
ZSTDUncompressCachedData uncomp_cached_data_;
public:
struct NoCache {};
// Do not use context cache, used by TableBuilder
UncompressionContext(NoCache, CompressionType comp_type) :
type_(comp_type) {
}
explicit UncompressionContext(CompressionType comp_type) :
UncompressionContext(comp_type, Slice()) {
}
UncompressionContext(CompressionType comp_type, const Slice& comp_dict) :
type_(comp_type), dict_(comp_dict) {
if (type_ == kZSTD) {
ctx_cache_ = CompressionContextCache::Instance();
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
}
}
~UncompressionContext() {
if (type_ == kZSTD &&
uncomp_cached_data_.GetCacheIndex() != -1) {
assert(ctx_cache_ != nullptr);
ctx_cache_->ReturnCachedZSTDUncompressData(
uncomp_cached_data_.GetCacheIndex());
}
}
UncompressionContext(const UncompressionContext&) = delete;
UncompressionContext& operator=(const UncompressionContext&) = delete;
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
return uncomp_cached_data_.Get();
}
CompressionType type() const {
return type_;
}
const Slice& dict() const {
return dict_;
}
Slice& dict() {
return dict_;
}
};
inline bool Snappy_Supported() { inline bool Snappy_Supported() {
#ifdef SNAPPY #ifdef SNAPPY
return true; return true;
@ -162,7 +364,7 @@ inline std::string CompressionTypeToString(CompressionType compression_type) {
// 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the
// start of compressed block. Snappy format is the same as version 1. // start of compressed block. Snappy format is the same as version 1.
inline bool Snappy_Compress(const CompressionOptions& /*opts*/, inline bool Snappy_Compress(const CompressionContext& /*ctx*/,
const char* input, size_t length, const char* input, size_t length,
::std::string* output) { ::std::string* output) {
#ifdef SNAPPY #ifdef SNAPPY
@ -229,10 +431,9 @@ inline bool GetDecompressedSizeInfo(const char** input_data,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool Zlib_Compress(const CompressionOptions& opts, inline bool Zlib_Compress(const CompressionContext& ctx,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output, size_t length, ::std::string* output) {
const Slice& compression_dict = Slice()) {
#ifdef ZLIB #ifdef ZLIB
if (length > std::numeric_limits<uint32_t>::max()) { if (length > std::numeric_limits<uint32_t>::max()) {
// Can't compress more than 4GB // Can't compress more than 4GB
@ -255,24 +456,24 @@ inline bool Zlib_Compress(const CompressionOptions& opts,
// The default value is 8. See zconf.h for more details. // The default value is 8. See zconf.h for more details.
static const int memLevel = 8; static const int memLevel = 8;
int level; int level;
if (opts.level == CompressionOptions::kDefaultCompressionLevel) { if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
level = Z_DEFAULT_COMPRESSION; level = Z_DEFAULT_COMPRESSION;
} else { } else {
level = opts.level; level = ctx.options().level;
} }
z_stream _stream; z_stream _stream;
memset(&_stream, 0, sizeof(z_stream)); memset(&_stream, 0, sizeof(z_stream));
int st = deflateInit2(&_stream, level, Z_DEFLATED, opts.window_bits, int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits,
memLevel, opts.strategy); memLevel, ctx.options().strategy);
if (st != Z_OK) { if (st != Z_OK) {
return false; return false;
} }
if (compression_dict.size()) { if (ctx.dict().size()) {
// Initialize the compression library's dictionary // Initialize the compression library's dictionary
st = deflateSetDictionary( st = deflateSetDictionary(
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()), &_stream, reinterpret_cast<const Bytef*>(ctx.dict().data()),
static_cast<unsigned int>(compression_dict.size())); static_cast<unsigned int>(ctx.dict().size()));
if (st != Z_OK) { if (st != Z_OK) {
deflateEnd(&_stream); deflateEnd(&_stream);
return false; return false;
@ -300,12 +501,11 @@ inline bool Zlib_Compress(const CompressionOptions& opts,
deflateEnd(&_stream); deflateEnd(&_stream);
return compressed; return compressed;
#else #else
(void)opts; (void)ctx;
(void)compress_format_version; (void)compress_format_version;
(void)input; (void)input;
(void)length; (void)length;
(void)output; (void)output;
(void)compression_dict;
return false; return false;
#endif #endif
} }
@ -316,10 +516,10 @@ inline bool Zlib_Compress(const CompressionOptions& opts,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* Zlib_Uncompress(const char* input_data, size_t input_length, inline char* Zlib_Uncompress(const UncompressionContext& ctx, const char* input_data,
size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version, uint32_t compress_format_version,
const Slice& compression_dict = Slice(),
int windowBits = -14) { int windowBits = -14) {
#ifdef ZLIB #ifdef ZLIB
uint32_t output_len = 0; uint32_t output_len = 0;
@ -349,11 +549,11 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
return nullptr; return nullptr;
} }
if (compression_dict.size()) { if (ctx.dict().size()) {
// Initialize the compression library's dictionary // Initialize the compression library's dictionary
st = inflateSetDictionary( st = inflateSetDictionary(
&_stream, reinterpret_cast<const Bytef*>(compression_dict.data()), &_stream, reinterpret_cast<const Bytef*>(ctx.dict().data()),
static_cast<unsigned int>(compression_dict.size())); static_cast<unsigned int>(ctx.dict().size()));
if (st != Z_OK) { if (st != Z_OK) {
return nullptr; return nullptr;
} }
@ -406,11 +606,11 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
inflateEnd(&_stream); inflateEnd(&_stream);
return output; return output;
#else #else
(void)ctx;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)decompress_size;
(void)compress_format_version; (void)compress_format_version;
(void)compression_dict;
(void)windowBits; (void)windowBits;
return nullptr; return nullptr;
#endif #endif
@ -420,7 +620,7 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length,
// block header // block header
// compress_format_version == 2 -- decompressed size is included in the block // compress_format_version == 2 -- decompressed size is included in the block
// header in varint32 format // header in varint32 format
inline bool BZip2_Compress(const CompressionOptions& /*opts*/, inline bool BZip2_Compress(const CompressionContext& /*ctx*/,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output) { size_t length, ::std::string* output) {
#ifdef BZIP2 #ifdef BZIP2
@ -567,10 +767,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool LZ4_Compress(const CompressionOptions& /*opts*/, inline bool LZ4_Compress(const CompressionContext& ctx,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output, size_t length, ::std::string* output) {
const Slice compression_dict = Slice()) {
#ifdef LZ4 #ifdef LZ4
if (length > std::numeric_limits<uint32_t>::max()) { if (length > std::numeric_limits<uint32_t>::max()) {
// Can't compress more than 4GB // Can't compress more than 4GB
@ -596,9 +795,9 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/,
int outlen; int outlen;
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_stream_t* stream = LZ4_createStream(); LZ4_stream_t* stream = LZ4_createStream();
if (compression_dict.size()) { if (ctx.dict().size()) {
LZ4_loadDict(stream, compression_dict.data(), LZ4_loadDict(stream, ctx.dict().data(),
static_cast<int>(compression_dict.size())); static_cast<int>(ctx.dict().size()));
} }
#if LZ4_VERSION_NUMBER >= 10700 // r129+ #if LZ4_VERSION_NUMBER >= 10700 // r129+
outlen = outlen =
@ -621,11 +820,11 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/,
output->resize(static_cast<size_t>(output_header_len + outlen)); output->resize(static_cast<size_t>(output_header_len + outlen));
return true; return true;
#else // LZ4 #else // LZ4
(void)ctx;
(void)compress_format_version; (void)compress_format_version;
(void)input; (void)input;
(void)length; (void)length;
(void)output; (void)output;
(void)compression_dict;
return false; return false;
#endif #endif
} }
@ -636,10 +835,10 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* LZ4_Uncompress(const char* input_data, size_t input_length, inline char* LZ4_Uncompress(const UncompressionContext& ctx, const char* input_data,
size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version, uint32_t compress_format_version) {
const Slice& compression_dict = Slice()) {
#ifdef LZ4 #ifdef LZ4
uint32_t output_len = 0; uint32_t output_len = 0;
if (compress_format_version == 2) { if (compress_format_version == 2) {
@ -662,9 +861,9 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
char* output = new char[output_len]; char* output = new char[output_len];
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); LZ4_streamDecode_t* stream = LZ4_createStreamDecode();
if (compression_dict.size()) { if (ctx.dict().size()) {
LZ4_setStreamDecode(stream, compression_dict.data(), LZ4_setStreamDecode(stream, ctx.dict().data(),
static_cast<int>(compression_dict.size())); static_cast<int>(ctx.dict().size()));
} }
*decompress_size = LZ4_decompress_safe_continue( *decompress_size = LZ4_decompress_safe_continue(
stream, input_data, output, static_cast<int>(input_length), stream, input_data, output, static_cast<int>(input_length),
@ -683,11 +882,11 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
assert(*decompress_size == static_cast<int>(output_len)); assert(*decompress_size == static_cast<int>(output_len));
return output; return output;
#else // LZ4 #else // LZ4
(void)ctx;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)decompress_size;
(void)compress_format_version; (void)compress_format_version;
(void)compression_dict;
return nullptr; return nullptr;
#endif #endif
} }
@ -698,10 +897,9 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool LZ4HC_Compress(const CompressionOptions& opts, inline bool LZ4HC_Compress(const CompressionContext& ctx,
uint32_t compress_format_version, const char* input, uint32_t compress_format_version, const char* input,
size_t length, ::std::string* output, size_t length, ::std::string* output) {
const Slice& compression_dict = Slice()) {
#ifdef LZ4 #ifdef LZ4
if (length > std::numeric_limits<uint32_t>::max()) { if (length > std::numeric_limits<uint32_t>::max()) {
// Can't compress more than 4GB // Can't compress more than 4GB
@ -726,17 +924,17 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts,
int outlen; int outlen;
int level; int level;
if (opts.level == CompressionOptions::kDefaultCompressionLevel) { if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
level = 0; // lz4hc.h says any value < 1 will be sanitized to default level = 0; // lz4hc.h says any value < 1 will be sanitized to default
} else { } else {
level = opts.level; level = ctx.options().level;
} }
#if LZ4_VERSION_NUMBER >= 10400 // r124+ #if LZ4_VERSION_NUMBER >= 10400 // r124+
LZ4_streamHC_t* stream = LZ4_createStreamHC(); LZ4_streamHC_t* stream = LZ4_createStreamHC();
LZ4_resetStreamHC(stream, level); LZ4_resetStreamHC(stream, level);
const char* compression_dict_data = const char* compression_dict_data =
compression_dict.size() > 0 ? compression_dict.data() : nullptr; ctx.dict().size() > 0 ? ctx.dict().data() : nullptr;
size_t compression_dict_size = compression_dict.size(); size_t compression_dict_size = ctx.dict().size();
LZ4_loadDictHC(stream, compression_dict_data, LZ4_loadDictHC(stream, compression_dict_data,
static_cast<int>(compression_dict_size)); static_cast<int>(compression_dict_size));
@ -767,12 +965,11 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts,
output->resize(static_cast<size_t>(output_header_len + outlen)); output->resize(static_cast<size_t>(output_header_len + outlen));
return true; return true;
#else // LZ4 #else // LZ4
(void)opts; (void)ctx;
(void)compress_format_version; (void)compress_format_version;
(void)input; (void)input;
(void)length; (void)length;
(void)output; (void)output;
(void)compression_dict;
return false; return false;
#endif #endif
} }
@ -804,9 +1001,8 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/,
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
size_t length, ::std::string* output, size_t length, ::std::string* output) {
const Slice& compression_dict = Slice()) {
#ifdef ZSTD #ifdef ZSTD
if (length > std::numeric_limits<uint32_t>::max()) { if (length > std::numeric_limits<uint32_t>::max()) {
// Can't compress more than 4GB // Can't compress more than 4GB
@ -818,21 +1014,21 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input,
size_t compressBound = ZSTD_compressBound(length); size_t compressBound = ZSTD_compressBound(length);
output->resize(static_cast<size_t>(output_header_len + compressBound)); output->resize(static_cast<size_t>(output_header_len + compressBound));
size_t outlen; size_t outlen = 0;
int level; int level;
if (opts.level == CompressionOptions::kDefaultCompressionLevel) { if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) {
// 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see
// https://github.com/facebook/zstd/issues/1148 // https://github.com/facebook/zstd/issues/1148
level = 3; level = 3;
} else { } else {
level = opts.level; level = ctx.options().level;
} }
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_CCtx* context = ZSTD_createCCtx(); ZSTD_CCtx* context = ctx.ZSTDPreallocCtx();
assert(context != nullptr);
outlen = ZSTD_compress_usingDict( outlen = ZSTD_compress_usingDict(
context, &(*output)[output_header_len], compressBound, input, length, context, &(*output)[output_header_len], compressBound, input, length,
compression_dict.data(), compression_dict.size(), level); ctx.dict().data(), ctx.dict().size(), level);
ZSTD_freeCCtx(context);
#else // up to v0.4.x #else // up to v0.4.x
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
length, level); length, level);
@ -843,20 +1039,19 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input,
output->resize(output_header_len + outlen); output->resize(output_header_len + outlen);
return true; return true;
#else // ZSTD #else // ZSTD
(void)opts; (void)ctx;
(void)input; (void)input;
(void)length; (void)length;
(void)output; (void)output;
(void)compression_dict;
return false; return false;
#endif #endif
} }
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, inline char* ZSTD_Uncompress(const UncompressionContext& ctx, const char* input_data,
int* decompress_size, size_t input_length,
const Slice& compression_dict = Slice()) { int* decompress_size) {
#ifdef ZSTD #ifdef ZSTD
uint32_t output_len = 0; uint32_t output_len = 0;
if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, if (!compression::GetDecompressedSizeInfo(&input_data, &input_length,
@ -867,11 +1062,11 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length,
char* output = new char[output_len]; char* output = new char[output_len];
size_t actual_output_length; size_t actual_output_length;
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_DCtx* context = ZSTD_createDCtx(); ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr);
actual_output_length = ZSTD_decompress_usingDict( actual_output_length = ZSTD_decompress_usingDict(
context, output, output_len, input_data, input_length, context, output, output_len, input_data, input_length,
compression_dict.data(), compression_dict.size()); ctx.dict().data(), ctx.dict().size());
ZSTD_freeDCtx(context);
#else // up to v0.4.x #else // up to v0.4.x
actual_output_length = actual_output_length =
ZSTD_decompress(output, output_len, input_data, input_length); ZSTD_decompress(output, output_len, input_data, input_length);
@ -880,10 +1075,10 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length,
*decompress_size = static_cast<int>(actual_output_length); *decompress_size = static_cast<int>(actual_output_length);
return output; return output;
#else // ZSTD #else // ZSTD
(void)ctx;
(void)input_data; (void)input_data;
(void)input_length; (void)input_length;
(void)decompress_size; (void)decompress_size;
(void)compression_dict;
return nullptr; return nullptr;
#endif #endif
} }

@ -0,0 +1,111 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
#include "util/compression_context_cache.h"
#include "util/compression.h"
#include "util/core_local.h"
#include <atomic>
namespace rocksdb {
namespace compression_cache {
void* const SentinelValue = nullptr;
// Cache ZSTD uncompression contexts for reads
// if needed we can add ZSTD compression context caching
// which is currently is not done since BlockBasedTableBuilder
// simply creates one compression context per new SST file.
struct ZSTDCachedData {
// We choose to cache the below structure instead of a ptr
// because we want to avoid a) native types leak b) make
// cache use transparent for the user
ZSTDUncompressCachedData uncomp_cached_data_;
std::atomic<void*> zstd_uncomp_sentinel_;
char padding[(CACHE_LINE_SIZE -
(sizeof(ZSTDUncompressCachedData) +
sizeof(std::atomic<void*>)) %
CACHE_LINE_SIZE)]; // unused padding field
ZSTDCachedData() : zstd_uncomp_sentinel_(&uncomp_cached_data_) {}
ZSTDCachedData(const ZSTDCachedData&) = delete;
ZSTDCachedData& operator=(const ZSTDCachedData&) = delete;
ZSTDUncompressCachedData GetUncompressData(int64_t idx) {
ZSTDUncompressCachedData result;
void* expected = &uncomp_cached_data_;
if (zstd_uncomp_sentinel_.compare_exchange_strong(expected, SentinelValue)) {
uncomp_cached_data_.CreateIfNeeded();
result.InitFromCache(uncomp_cached_data_, idx);
} else {
// Creates one time use data
result.CreateIfNeeded();
}
return result;
}
// Return the entry back into circulation
// This is executed only when we successfully obtained
// in the first place
void ReturnUncompressData() {
if (zstd_uncomp_sentinel_.exchange(&uncomp_cached_data_) != SentinelValue) {
// Means we are returning while not having it acquired.
assert(false);
}
}
};
static_assert(sizeof(ZSTDCachedData) % CACHE_LINE_SIZE == 0, "Expected CACHE_LINE_SIZE alignment");
} // compression_cache
using namespace compression_cache;
class CompressionContextCache::Rep {
public:
Rep() {
}
ZSTDUncompressCachedData GetZSTDUncompressData() {
auto p = per_core_uncompr_.AccessElementAndIndex();
int64_t idx = static_cast<int64_t>(p.second);
return p.first->GetUncompressData(idx);
}
void ReturnZSTDUncompressData(int64_t idx) {
assert(idx >= 0);
auto* cn = per_core_uncompr_.AccessAtCore(static_cast<size_t>(idx));
cn->ReturnUncompressData();
}
private:
CoreLocalArray<ZSTDCachedData> per_core_uncompr_;
};
CompressionContextCache::CompressionContextCache() :
rep_(new Rep()) {
}
CompressionContextCache* CompressionContextCache::Instance() {
static CompressionContextCache instance;
return &instance;
}
void CompressionContextCache::InitSingleton() {
Instance();
}
ZSTDUncompressCachedData CompressionContextCache::GetCachedZSTDUncompressData() {
return rep_->GetZSTDUncompressData();
}
void CompressionContextCache::ReturnCachedZSTDUncompressData(int64_t idx) {
rep_->ReturnZSTDUncompressData(idx);
}
CompressionContextCache::~CompressionContextCache() {
delete rep_;
}
}

@ -0,0 +1,45 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
//
// Compression context cache allows to cache compression/uncompression contexts
// This helps with Random Read latencies and reduces CPU utilization
// Caching is implemented using CoreLocal facility. Compression/Uncompression
// instances are cached on a per core basis using CoreLocalArray. A borrowed
// instance is atomically replaced with a sentinel value for the time of being used.
// If it turns out that another thread is already makes use of the instance we still
// create one on the heap which is later is destroyed.
#pragma once
#include <stdint.h>
namespace rocksdb {
class ZSTDUncompressCachedData;
class CompressionContextCache {
public:
// Singleton
static CompressionContextCache* Instance();
static void InitSingleton();
CompressionContextCache(const CompressionContextCache&) = delete;
CompressionContextCache& operator=(const CompressionContextCache&) = delete;
ZSTDUncompressCachedData GetCachedZSTDUncompressData();
void ReturnCachedZSTDUncompressData(int64_t idx);
private:
// Singleton
CompressionContextCache();
~CompressionContextCache();
class Rep;
Rep* rep_;
};
}

@ -45,6 +45,9 @@ class ThreadLocalPtr {
public: public:
explicit ThreadLocalPtr(UnrefHandler handler = nullptr); explicit ThreadLocalPtr(UnrefHandler handler = nullptr);
ThreadLocalPtr(const ThreadLocalPtr&) = delete;
ThreadLocalPtr& operator=(const ThreadLocalPtr&) = delete;
~ThreadLocalPtr(); ~ThreadLocalPtr();
// Return the current pointer stored in thread local // Return the current pointer stored in thread local

@ -773,9 +773,9 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
} }
StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
CompressionType ct = bdb_options_.compression; CompressionType ct = bdb_options_.compression;
CompressionOptions compression_opts; CompressionContext compression_ctx(ct);
CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat, CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat,
Slice(), compression_output); compression_output);
return *compression_output; return *compression_output;
} }
@ -1120,9 +1120,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
{ {
StopWatch decompression_sw(env_, statistics_, StopWatch decompression_sw(env_, statistics_,
BLOB_DB_DECOMPRESSION_MICROS); BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext uncompression_ctx(bfile->compression());
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
uncompression_ctx,
blob_value.data(), blob_value.size(), &contents, blob_value.data(), blob_value.size(), &contents,
kBlockBasedTableVersionFormat, Slice(), bfile->compression(), kBlockBasedTableVersionFormat,
*(cfh->cfd()->ioptions())); *(cfh->cfd()->ioptions()));
} }
value->PinSelf(contents.data); value->PinSelf(contents.data);

@ -205,9 +205,11 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
if (compression != kNoCompression && if (compression != kNoCompression &&
(show_uncompressed_blob != DisplayType::kNone || show_summary)) { (show_uncompressed_blob != DisplayType::kNone || show_summary)) {
BlockContents contents; BlockContents contents;
UncompressionContext uncompression_ctx(compression);
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
uncompression_ctx,
slice.data() + key_size, value_size, &contents, slice.data() + key_size, value_size, &contents,
2 /*compress_format_version*/, Slice(), compression, 2 /*compress_format_version*/,
ImmutableCFOptions(Options())); ImmutableCFOptions(Options()));
if (!s.ok()) { if (!s.ok()) {
return s; return s;

@ -84,16 +84,16 @@ void ColumnAwareEncodingReader::DecodeBlocks(
auto& slice_final_with_bit = block; auto& slice_final_with_bit = block;
uint32_t format_version = 2; uint32_t format_version = 2;
Slice compression_dict;
BlockContents contents; BlockContents contents;
const char* content_ptr; const char* content_ptr;
CompressionType type = CompressionType type =
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) { if (type != kNoCompression) {
UncompressBlockContents(slice_final_with_bit.c_str(), UncompressionContext uncompression_ctx(type);
UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents, slice_final_with_bit.size() - 1, &contents,
format_version, compression_dict, ioptions); format_version, ioptions);
content_ptr = contents.data.data(); content_ptr = contents.data.data();
} else { } else {
content_ptr = slice_final_with_bit.data(); content_ptr = slice_final_with_bit.data();
@ -166,16 +166,17 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
for (auto& block : *blocks) { for (auto& block : *blocks) {
auto& slice_final_with_bit = block; auto& slice_final_with_bit = block;
uint32_t format_version = 2; uint32_t format_version = 2;
Slice compression_dict;
BlockContents contents; BlockContents contents;
std::string decoded_content; std::string decoded_content;
CompressionType type = CompressionType type =
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) { if (type != kNoCompression) {
UncompressBlockContents(slice_final_with_bit.c_str(), UncompressionContext uncompression_ctx(type);
UncompressBlockContents(uncompression_ctx,
slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents, slice_final_with_bit.size() - 1, &contents,
format_version, compression_dict, ioptions); format_version, ioptions);
decoded_content = std::string(contents.data.data(), contents.data.size()); decoded_content = std::string(contents.data.data(), contents.data.size());
} else { } else {
decoded_content = std::move(slice_final_with_bit); decoded_content = std::move(slice_final_with_bit);
@ -244,12 +245,11 @@ namespace {
void CompressDataBlock(const std::string& output_content, Slice* slice_final, void CompressDataBlock(const std::string& output_content, Slice* slice_final,
CompressionType* type, std::string* compressed_output) { CompressionType* type, std::string* compressed_output) {
CompressionOptions compression_opts; CompressionContext compression_ctx(*type);
uint32_t format_version = 2; // hard-coded version uint32_t format_version = 2; // hard-coded version
Slice compression_dict;
*slice_final = *slice_final =
CompressBlock(output_content, compression_opts, type, format_version, CompressBlock(output_content, compression_ctx, type, format_version,
compression_dict, compressed_output); compressed_output);
} }
} // namespace } // namespace

Loading…
Cancel
Save