From f4b72d70567003583e19a8bd7a98a5770299ac52 Mon Sep 17 00:00:00 2001 From: Dmitri Smirnov Date: Mon, 4 Jun 2018 12:04:52 -0700 Subject: [PATCH] Provide a way to override windows memory allocator with jemalloc for ZSTD Summary: Windows does not have LD_PRELOAD mechanism to override all memory allocation functions and ZSTD makes use of C-tuntime calloc. During flushes and compactions default system allocator fragments and the system slows down considerably. For builds with jemalloc we employ an advanced ZSTD context creation API that re-directs memory allocation to jemalloc. To reduce the cost of context creation on each block we cache ZSTD context within the block based table builder while a new SST file is being built, this will help all platform builds including those w/o jemalloc. This avoids system allocator fragmentation and improves the performance. The change does not address random reads and currently on Windows reads with ZSTD regress as compared with SNAPPY compression. Closes https://github.com/facebook/rocksdb/pull/3838 Differential Revision: D8229794 Pulled By: miasantreble fbshipit-source-id: 719b622ab7bf4109819bc44f45ec66f0dd3ee80d --- CMakeLists.txt | 19 +- TARGETS | 1 + port/win/env_default.cc | 12 +- port/win/win_jemalloc.cc | 19 ++ src.mk | 1 + table/block_based_table_builder.cc | 76 +++--- table/block_based_table_builder.h | 12 +- table/block_based_table_reader.cc | 13 +- table/block_fetcher.cc | 6 +- table/format.cc | 35 ++- table/format.h | 11 +- thirdparty.inc | 100 +++++--- tools/db_bench_tool.cc | 47 ++-- util/compression.h | 317 +++++++++++++++++++----- util/compression_context_cache.cc | 111 +++++++++ util/compression_context_cache.h | 45 ++++ util/thread_local.h | 3 + utilities/blob_db/blob_db_impl.cc | 10 +- utilities/blob_db/blob_dump_tool.cc | 4 +- utilities/column_aware_encoding_util.cc | 20 +- 20 files changed, 660 insertions(+), 202 deletions(-) create mode 100644 util/compression_context_cache.cc create mode 100644 util/compression_context_cache.h diff --git a/CMakeLists.txt b/CMakeLists.txt index cadb0a8cf..6fedf455a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -322,14 +322,24 @@ if(DEFINED USE_RTTI) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI") else() - message(STATUS "Disabling RTTI") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti") - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + if(MSVC) + message(STATUS "Disabling RTTI in Release builds. Always on in Debug.") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-") + else() + message(STATUS "Disabling RTTI in Release builds") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + endif() endif() else() message(STATUS "Enabling RTTI in Debug builds only (default)") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + if(MSVC) + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-") + else() + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + endif() endif() if(MSVC) @@ -574,6 +584,7 @@ set(SOURCES util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc + util/compression_context_cache.cc util/concurrent_arena.cc util/crc32c.cc util/delete_scheduler.cc diff --git a/TARGETS b/TARGETS index d130f512b..3e8a2b78d 100644 --- a/TARGETS +++ b/TARGETS @@ -200,6 +200,7 @@ cpp_library( "util/coding.cc", "util/compaction_job_stats_impl.cc", "util/comparator.cc", + "util/compression_context_cache.cc", "util/concurrent_arena.cc", "util/crc32c.cc", "util/delete_scheduler.cc", diff --git a/port/win/env_default.cc b/port/win/env_default.cc index 52a984f74..b99cf9e94 100644 --- a/port/win/env_default.cc +++ b/port/win/env_default.cc @@ -11,16 +11,13 @@ #include #include "port/win/env_win.h" +#include "util/compression_context_cache.h" +#include "util/thread_local.h" namespace rocksdb { namespace port { -// We choose to create this on the heap and using std::once for the following -// reasons -// 1) Currently available MS compiler does not implement atomic C++11 -// initialization of -// function local statics -// 2) We choose not to destroy the env because joining the threads from the +// We choose not to destroy the env because joining the threads from the // system loader // which destroys the statics (same as from DLLMain) creates a system loader // dead-lock. @@ -29,11 +26,12 @@ namespace { std::once_flag winenv_once_flag; Env* envptr; }; - } Env* Env::Default() { using namespace port; + ThreadLocalPtr::InitSingletons(); + CompressionContextCache::InitSingleton(); std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); return envptr; } diff --git a/port/win/win_jemalloc.cc b/port/win/win_jemalloc.cc index fc46e189c..4f84c0734 100644 --- a/port/win/win_jemalloc.cc +++ b/port/win/win_jemalloc.cc @@ -14,6 +14,25 @@ #include #include "jemalloc/jemalloc.h" +#if defined(ZSTD) && defined(ZSTD_STATIC_LINKING_ONLY) +#include +#if (ZSTD_VERSION_NUMBER >= 500) +namespace rocksdb { +namespace port { +void* JemallocAllocateForZSTD(void* /* opaque */, size_t size) { + return je_malloc(size); +} +void JemallocDeallocateForZSTD(void* /* opaque */, void* address) { + je_free(address); +} +ZSTD_customMem GetJeZstdAllocationOverrides() { + return { JemallocAllocateForZSTD, JemallocDeallocateForZSTD, nullptr }; +} +} // namespace port +} // namespace rocksdb +#endif // (ZSTD_VERSION_NUMBER >= 500) +#endif // defined(ZSTD) defined(ZSTD_STATIC_LINKING_ONLY) + // Global operators to be replaced by a linker when this file is // a part of the build diff --git a/src.mk b/src.mk index 48958784d..047eaa3f2 100644 --- a/src.mk +++ b/src.mk @@ -129,6 +129,7 @@ LIB_SOURCES = \ util/coding.cc \ util/compaction_job_stats_impl.cc \ util/comparator.cc \ + util/compression_context_cache.cc \ util/concurrent_arena.cc \ util/crc32c.cc \ util/delete_scheduler.cc \ diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index a742f6327..43c1e999c 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -104,19 +104,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { // format_version is the block format as defined in include/rocksdb/table.h Slice CompressBlock(const Slice& raw, - const CompressionOptions& compression_options, + const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, - const Slice& compression_dict, std::string* compressed_output) { - if (*type == kNoCompression) { + *type = compression_ctx.type(); + if (compression_ctx.type() == kNoCompression) { return raw; } // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". - switch (*type) { + switch (compression_ctx.type()) { case kSnappyCompression: - if (Snappy_Compress(compression_options, raw.data(), raw.size(), + if (Snappy_Compress(compression_ctx, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; @@ -124,16 +124,16 @@ Slice CompressBlock(const Slice& raw, break; // fall back to no compression. case kZlibCompression: if (Zlib_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kZlibCompression, format_version), - raw.data(), raw.size(), compressed_output, compression_dict) && + raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kBZip2Compression: if (BZip2_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kBZip2Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -142,18 +142,18 @@ Slice CompressBlock(const Slice& raw, break; // fall back to no compression. case kLZ4Compression: if (LZ4_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kLZ4Compression, format_version), - raw.data(), raw.size(), compressed_output, compression_dict) && + raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kLZ4HCCompression: if (LZ4HC_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kLZ4HCCompression, format_version), - raw.data(), raw.size(), compressed_output, compression_dict) && + raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -167,8 +167,8 @@ Slice CompressBlock(const Slice& raw, break; case kZSTD: case kZSTDNotFinalCompression: - if (ZSTD_Compress(compression_options, raw.data(), raw.size(), - compressed_output, compression_dict) && + if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -261,10 +261,10 @@ struct BlockBasedTableBuilder::Rep { PartitionedIndexBuilder* p_index_builder_ = nullptr; std::string last_key; - const CompressionType compression_type; - const CompressionOptions compression_opts; - // Data for presetting the compression library's dictionary, or nullptr. - const std::string* compression_dict; + // Compression dictionary or nullptr + const std::string* compression_dict; + CompressionContext compression_ctx; + std::unique_ptr verify_ctx; TableProperties props; bool closed = false; // Either Finish() or Abandon() has been called. @@ -306,9 +306,8 @@ struct BlockBasedTableBuilder::Rep { table_options.use_delta_encoding), range_del_block(1 /* block_restart_interval */), internal_prefix_transform(_moptions.prefix_extractor.get()), - compression_type(_compression_type), - compression_opts(_compression_opts), compression_dict(_compression_dict), + compression_ctx(_compression_type, _compression_opts), compressed_cache_key_prefix_size(0), flush_block_policy( table_options.flush_block_policy_factory->NewFlushBlockPolicy( @@ -342,6 +341,16 @@ struct BlockBasedTableBuilder::Rep { new BlockBasedTablePropertiesCollector( table_options.index_type, table_options.whole_key_filtering, _moptions.prefix_extractor != nullptr)); + if (table_options.verify_compression) { + verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), + compression_ctx.type())); + } + } + + Rep(const Rep&) = delete; + Rep& operator=(const Rep&) = delete; + + ~Rep() { } }; @@ -480,7 +489,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, assert(ok()); Rep* r = rep_; - auto type = r->compression_type; + auto type = r->compression_ctx.type(); Slice block_contents; bool abort_compression = false; @@ -490,12 +499,23 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, if (raw_block_contents.size() < kCompressionSizeLimit) { Slice compression_dict; if (is_data_block && r->compression_dict && r->compression_dict->size()) { - compression_dict = *r->compression_dict; + r->compression_ctx.dict() = *r->compression_dict; + if (r->table_options.verify_compression) { + assert(r->verify_ctx != nullptr); + r->verify_ctx->dict() = *r->compression_dict; + } + } else { + // Clear dictionary + r->compression_ctx.dict() = Slice(); + if (r->table_options.verify_compression) { + assert(r->verify_ctx != nullptr); + r->verify_ctx->dict() = Slice(); + } } - block_contents = CompressBlock(raw_block_contents, r->compression_opts, - &type, r->table_options.format_version, - compression_dict, &r->compressed_output); + block_contents = CompressBlock(raw_block_contents, r->compression_ctx, + &type, r->table_options.format_version, + &r->compressed_output); // Some of the compression algorithms are known to be unreliable. If // the verify_compression flag is set then try to de-compress the @@ -503,9 +523,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, if (type != kNoCompression && r->table_options.verify_compression) { // Retrieve the uncompressed contents into a new buffer BlockContents contents; - Status stat = UncompressBlockContentsForCompressionType( + Status stat = UncompressBlockContentsForCompressionType(*r->verify_ctx, block_contents.data(), block_contents.size(), &contents, - r->table_options.format_version, compression_dict, type, + r->table_options.format_version, r->ioptions); if (stat.ok()) { @@ -739,7 +759,7 @@ Status BlockBasedTableBuilder::Finish() { r->props.merge_operator_name = r->ioptions.merge_operator != nullptr ? r->ioptions.merge_operator->Name() : "nullptr"; - r->props.compression_name = CompressionTypeToString(r->compression_type); + r->props.compression_name = CompressionTypeToString(r->compression_ctx.type()); r->props.prefix_extractor_name = r->moptions.prefix_extractor != nullptr ? r->moptions.prefix_extractor->Name() diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index a0ba87f14..42dc953f0 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -19,6 +19,7 @@ #include "rocksdb/options.h" #include "rocksdb/status.h" #include "table/table_builder.h" +#include "util/compression.h" namespace rocksdb { @@ -53,6 +54,10 @@ class BlockBasedTableBuilder : public TableBuilder { // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); + // No copying allowed + BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete; + BlockBasedTableBuilder& operator=(const BlockBasedTableBuilder&) = delete; + // Add key,value to the table being constructed. // REQUIRES: key is after any previously added key according to comparator. // REQUIRES: Finish(), Abandon() have not been called @@ -115,16 +120,11 @@ class BlockBasedTableBuilder : public TableBuilder { // Some compression libraries fail when the raw size is bigger than int. If // uncompressed size is bigger than kCompressionSizeLimit, don't compress it const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); - - // No copying allowed - BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete; - void operator=(const BlockBasedTableBuilder&) = delete; }; Slice CompressBlock(const Slice& raw, - const CompressionOptions& compression_options, + const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, - const Slice& compression_dict, std::string* compressed_output); } // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 2462b880f..d384981e7 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1104,9 +1104,12 @@ Status BlockBasedTable::GetDataBlockFromCache( // Retrieve the uncompressed contents into a new buffer BlockContents contents; - s = UncompressBlockContents(compressed_block->data(), + UncompressionContext uncompresssion_ctx(compressed_block->compression_type(), + compression_dict); + s = UncompressBlockContents(uncompresssion_ctx, + compressed_block->data(), compressed_block->size(), &contents, - format_version, compression_dict, + format_version, ioptions); // Insert uncompressed block into block cache @@ -1182,8 +1185,10 @@ Status BlockBasedTable::PutDataBlockToCache( BlockContents contents; Statistics* statistics = ioptions.statistics; if (raw_block->compression_type() != kNoCompression) { - s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents, - format_version, compression_dict, ioptions); + UncompressionContext uncompression_ctx(raw_block->compression_type(), compression_dict); + s = UncompressBlockContents(uncompression_ctx, raw_block->data(), + raw_block->size(), &contents, + format_version, ioptions); } if (!s.ok()) { delete raw_block; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index fd01ac1c6..6a2f5f995 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -225,8 +225,10 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type != kNoCompression) { // compressed page, uncompress, update cache - status_ = UncompressBlockContents(slice_.data(), block_size_, contents_, - footer_.version(), compression_dict_, + UncompressionContext uncompression_ctx(compression_type, compression_dict_); + status_ = UncompressBlockContents(uncompression_ctx, + slice_.data(), block_size_, contents_, + footer_.version(), ioptions_); } else { GetBlockContents(); diff --git a/table/format.cc b/table/format.cc index 534d8c466..3a8f80e34 100644 --- a/table/format.cc +++ b/table/format.cc @@ -264,18 +264,17 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, return Status::OK(); } -Status UncompressBlockContentsForCompressionType( +Status UncompressBlockContentsForCompressionType(const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, - uint32_t format_version, const Slice& compression_dict, - CompressionType compression_type, const ImmutableCFOptions &ioptions) { + uint32_t format_version, const ImmutableCFOptions &ioptions) { std::unique_ptr ubuf; - assert(compression_type != kNoCompression && "Invalid compression type"); + assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); int decompress_size = 0; - switch (compression_type) { + switch (uncompression_ctx.type()) { case kSnappyCompression: { size_t ulength = 0; static char snappy_corrupt_msg[] = @@ -291,10 +290,9 @@ Status UncompressBlockContentsForCompressionType( break; } case kZlibCompression: - ubuf.reset(Zlib_Uncompress( + ubuf.reset(Zlib_Uncompress(uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version), - compression_dict)); + GetCompressFormatForVersion(kZlibCompression, format_version))); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -316,10 +314,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf.reset(LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress(uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version), - compression_dict)); + GetCompressFormatForVersion(kLZ4Compression, format_version))); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -329,10 +326,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf.reset(LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress(uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version), - compression_dict)); + GetCompressFormatForVersion(kLZ4HCCompression, format_version))); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; @@ -353,7 +349,7 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size, compression_dict)); + ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size)); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; @@ -383,14 +379,15 @@ Status UncompressBlockContentsForCompressionType( // buffer is returned via 'result' and it is upto the caller to // free this buffer. // format_version is the block format as defined in include/rocksdb/table.h -Status UncompressBlockContents(const char* data, size_t n, +Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, + const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const Slice& compression_dict, const ImmutableCFOptions &ioptions) { assert(data[n] != kNoCompression); + assert(data[n] == uncompression_ctx.type()); return UncompressBlockContentsForCompressionType( - data, n, contents, format_version, compression_dict, - (CompressionType)data[n], ioptions); + uncompression_ctx, data, n, contents, + format_version, ioptions); } } // namespace rocksdb diff --git a/table/format.h b/table/format.h index f3737fc21..15f167be0 100644 --- a/table/format.h +++ b/table/format.h @@ -228,19 +228,20 @@ extern Status ReadBlockContents( // free this buffer. // For description of compress_format_version and possible values, see // util/compression.h -extern Status UncompressBlockContents(const char* data, size_t n, +extern Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, + const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const Slice& compression_dict, - const ImmutableCFOptions& ioptions); + const ImmutableCFOptions &ioptions); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks // with no compression header. extern Status UncompressBlockContentsForCompressionType( + const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, - uint32_t compress_format_version, const Slice& compression_dict, - CompressionType compression_type, const ImmutableCFOptions& ioptions); + uint32_t compress_format_version, + const ImmutableCFOptions& ioptions); // Implementation details follow. Clients should ignore, diff --git a/thirdparty.inc b/thirdparty.inc index 26c50d99f..f40b81fec 100644 --- a/thirdparty.inc +++ b/thirdparty.inc @@ -12,9 +12,9 @@ set (THIRDPARTY_LIBS "") # Initialization, don't touch # Defaults # set(GFLAGS_HOME $ENV{THIRDPARTY_HOME}/Gflags.Library) -set(GFLAGS_INCLUDE ${GFLAGS_HOME}/inc/include) -set(GFLAGS_LIB_DEBUG ${GFLAGS_HOME}/bin/debug/amd64/gflags.lib) -set(GFLAGS_LIB_RELEASE ${GFLAGS_HOME}/bin/retail/amd64/gflags.lib) +set(GFLAGS_INCLUDE ${GFLAGS_HOME}/build/native/include) +set(GFLAGS_LIB_DEBUG ${GFLAGS_HOME}/lib/native/debug/amd64/gflags.lib) +set(GFLAGS_LIB_RELEASE ${GFLAGS_HOME}/lib/native/retail/amd64/gflags.lib) # ================================================== GFLAGS ================================================== # For compatibility @@ -52,9 +52,9 @@ endif () # Edit these 4 lines to define paths to Snappy # set(SNAPPY_HOME $ENV{THIRDPARTY_HOME}/Snappy.Library) -set(SNAPPY_INCLUDE ${SNAPPY_HOME}/inc/inc) -set(SNAPPY_LIB_DEBUG ${SNAPPY_HOME}/bin/debug/amd64/snappy.lib) -set(SNAPPY_LIB_RELEASE ${SNAPPY_HOME}/bin/retail/amd64/snappy.lib) +set(SNAPPY_INCLUDE ${SNAPPY_HOME}/build/native/inc/inc) +set(SNAPPY_LIB_DEBUG ${SNAPPY_HOME}/lib/native/debug/amd64/snappy.lib) +set(SNAPPY_LIB_RELEASE ${SNAPPY_HOME}/lib/native/retail/amd64/snappy.lib) # For compatibility if(SNAPPY) @@ -63,11 +63,11 @@ endif () if (WITH_SNAPPY) message(STATUS "SNAPPY library is enabled") - + if(DEFINED ENV{SNAPPY_INCLUDE}) set(SNAPPY_INCLUDE $ENV{SNAPPY_INCLUDE}) endif() - + if(DEFINED ENV{SNAPPY_LIB_DEBUG}) set(SNAPPY_LIB_DEBUG $ENV{SNAPPY_LIB_DEBUG}) endif() @@ -75,7 +75,7 @@ if (WITH_SNAPPY) if(DEFINED ENV{SNAPPY_LIB_RELEASE}) set(SNAPPY_LIB_RELEASE $ENV{SNAPPY_LIB_RELEASE}) endif() - + set(SNAPPY_CXX_FLAGS -DSNAPPY) set(SNAPPY_LIBS debug ${SNAPPY_LIB_DEBUG} optimized ${SNAPPY_LIB_RELEASE}) @@ -91,14 +91,13 @@ endif () # Edit these 4 lines to define paths to LZ4 # set(LZ4_HOME $ENV{THIRDPARTY_HOME}/LZ4.Library) -set(LZ4_INCLUDE ${LZ4_HOME}/inc/include) -set(LZ4_LIB_DEBUG ${LZ4_HOME}/bin/debug/amd64/lz4.lib) -set(LZ4_LIB_RELEASE ${LZ4_HOME}/bin/retail/amd64/lz4.lib) +set(LZ4_INCLUDE ${LZ4_HOME}/build/native/inc/inc) +set(LZ4_LIB_DEBUG ${LZ4_HOME}/lib/native/debug/amd64/lz4.lib) +set(LZ4_LIB_RELEASE ${LZ4_HOME}/lib/native/retail/amd64/lz4.lib) -# -# Don't touch these lines -# -if (DEFINED LZ4) + +# For compatibility +if (LZ4) set(WITH_LZ4 ON) endif () @@ -132,13 +131,9 @@ endif () # Edit these 4 lines to define paths to ZLIB # set(ZLIB_HOME $ENV{THIRDPARTY_HOME}/ZLIB.Library) -set(ZLIB_INCLUDE ${ZLIB_HOME}/inc/include) -set(ZLIB_LIB_DEBUG ${ZLIB_HOME}/bin/debug/amd64/zlib.lib) -set(ZLIB_LIB_RELEASE ${ZLIB_HOME}/bin/retail/amd64/zlib.lib) - -# -# Don't touch these lines -# +set(ZLIB_INCLUDE ${ZLIB_HOME}/build/native/inc/inc) +set(ZLIB_LIB_DEBUG ${ZLIB_HOME}/lib/native/debug/amd64/zlib.lib) +set(ZLIB_LIB_RELEASE ${ZLIB_HOME}/lib/native/retail/amd64/zlib.lib) # For compatibilty if (ZLIB) @@ -170,6 +165,9 @@ else () message(STATUS "ZLIB library is disabled") endif () +# ================================================== XPRESS ================================================== +# This makes use of built-in Windows API, no additional includes, links to a system lib + # For compatibilty if (XPRESS) set(WITH_XPRESS ON) @@ -186,20 +184,56 @@ else () message(STATUS "XPRESS is disabled") endif () + +# ================================================== ZSTD ================================================== # -# Edit these 4 lines to define paths to Jemalloc +# Edit these 4 lines to define paths to ZSTD # -set(JEMALLOC_HOME $ENV{THIRDPARTY_HOME}/Jemalloc.Library) -set(JEMALLOC_INCLUDE ${JEMALLOC_HOME}/inc/include) -set(JEMALLOC_LIB_DEBUG ${JEMALLOC_HOME}/bin/debug/amd64/jemalloc.lib) -set(JEMALLOC_LIB_RELEASE ${JEMALLOC_HOME}/bin/retail/amd64/jemalloc.lib) +set(ZSTD_HOME $ENV{THIRDPARTY_HOME}/ZSTD.Library) +set(ZSTD_INCLUDE ${ZSTD_HOME}/build/native/inc) +set(ZSTD_LIB_DEBUG ${ZSTD_HOME}/lib/native/debug/amd64/libzstd_static.lib) +set(ZSTD_LIB_RELEASE ${ZSTD_HOME}/lib/native/retail/amd64/libzstd_static.lib) + +# For compatibility +if (ZSTD) + set(WITH_ZSTD ON) +endif () + +if (WITH_ZSTD) + message(STATUS "ZSTD library is enabled") + + if(DEFINED ENV{ZSTD_INCLUDE}) + set(ZSTD_INCLUDE $ENV{ZSTD_INCLUDE}) + endif() + + if(DEFINED ENV{ZSTD_LIB_DEBUG}) + set(ZSTD_LIB_DEBUG $ENV{ZSTD_LIB_DEBUG}) + endif() + + if(DEFINED ENV{ZSTD_LIB_RELEASE}) + set(ZSTD_LIB_RELEASE $ENV{ZSTD_LIB_RELEASE}) + endif() + + # ZSTD_STATIC_LINKING_ONLY only allows us to create an allocation functions override + # When jemalloc is in use + set(ZSTD_LIBS debug ${ZSTD_LIB_DEBUG} optimized ${ZSTD_LIB_RELEASE}) + + add_definitions(-DZSTD -DZSTD_STATIC_LINKING_ONLY) + include_directories(${ZSTD_INCLUDE}) + set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${ZSTD_LIBS}) +else () + message(STATUS "ZSTD library is disabled") +endif () -# ================================================== JEMALLOC ================================================== # -# Don't touch these lines +# Edit these 4 lines to define paths to Jemalloc # +set(JEMALLOC_HOME $ENV{THIRDPARTY_HOME}/Jemalloc.Library) +set(JEMALLOC_INCLUDE ${JEMALLOC_HOME}/build/native/inc) +set(JEMALLOC_LIB_DEBUG ${JEMALLOC_HOME}/lib/native/debug/amd64/jemalloc.lib) +set(JEMALLOC_LIB_RELEASE ${JEMALLOC_HOME}/lib/native/retail/amd64/jemalloc.lib) -# For compatibilty +# ================================================== JEMALLOC ================================================== if(JEMALLOC) set(WITH_JEMALLOC ON) endif() @@ -226,9 +260,7 @@ if (WITH_JEMALLOC) include_directories(${JEMALLOC_INCLUDE}) set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${JEMALLOC_LIBS}) set (ARTIFACT_SUFFIX "_je") - - set(WITH_JEMALLOC ON) - + else () set (ARTIFACT_SUFFIX "") message(STATUS "JEMALLOC library is disabled") diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 79a06b4fe..a992562a3 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1941,27 +1941,28 @@ class Benchmark { return true; } - inline bool CompressSlice(const Slice& input, std::string* compressed) { + inline bool CompressSlice(const CompressionContext& compression_ctx, + const Slice& input, std::string* compressed) { bool ok = true; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: - ok = Snappy_Compress(Options().compression_opts, input.data(), + ok = Snappy_Compress(compression_ctx, input.data(), input.size(), compressed); break; case rocksdb::kZlibCompression: - ok = Zlib_Compress(Options().compression_opts, 2, input.data(), + ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kBZip2Compression: - ok = BZip2_Compress(Options().compression_opts, 2, input.data(), + ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4Compression: - ok = LZ4_Compress(Options().compression_opts, 2, input.data(), + ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4HCCompression: - ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(), + ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kXpressCompression: @@ -1969,8 +1970,8 @@ class Benchmark { input.size(), compressed); break; case rocksdb::kZSTD: - ok = ZSTD_Compress(Options().compression_opts, input.data(), - input.size(), compressed); + ok = ZSTD_Compress(compression_ctx, input.data(), + input.size(), compressed); break; default: ok = false; @@ -2052,7 +2053,8 @@ class Benchmark { const int len = FLAGS_block_size; std::string input_str(len, 'y'); std::string compressed; - bool result = CompressSlice(Slice(input_str), &compressed); + CompressionContext compression_ctx(FLAGS_compression_type_e, Options().compression_opts); + bool result = CompressSlice(compression_ctx, Slice(input_str), &compressed); if (!result) { fprintf(stdout, "WARNING: %s compression is not enabled\n", @@ -2194,10 +2196,11 @@ class Benchmark { merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), report_file_operations_(FLAGS_report_file_operations), #ifndef ROCKSDB_LITE - use_blob_db_(FLAGS_use_blob_db) { + use_blob_db_(FLAGS_use_blob_db) #else - use_blob_db_(false) { + use_blob_db_(false) #endif // !ROCKSDB_LITE + { // use simcache instead of cache if (FLAGS_simcache_size >= 0) { if (FLAGS_cache_numshardbits >= 1) { @@ -2843,11 +2846,13 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t produced = 0; bool ok = true; std::string compressed; + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); // Compress 1G while (ok && bytes < int64_t(1) << 30) { compressed.clear(); - ok = CompressSlice(input, &compressed); + ok = CompressSlice(compression_ctx, input, &compressed); produced += compressed.size(); bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress); @@ -2869,7 +2874,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { Slice input = gen.Generate(FLAGS_block_size); std::string compressed; - bool ok = CompressSlice(input, &compressed); + UncompressionContext uncompression_ctx(FLAGS_compression_type_e); + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); + + bool ok = CompressSlice(compression_ctx, input, &compressed); int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { @@ -2889,7 +2898,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { break; } case rocksdb::kZlibCompression: - uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(), + uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; @@ -2899,12 +2909,14 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed != nullptr; break; case rocksdb::kLZ4Compression: - uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), + uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; case rocksdb::kLZ4HCCompression: - uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), + uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; @@ -2914,7 +2926,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed != nullptr; break; case rocksdb::kZSTD: - uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(), + uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size); ok = uncompressed != nullptr; break; diff --git a/util/compression.h b/util/compression.h index 69df6695f..ef57ee11b 100644 --- a/util/compression.h +++ b/util/compression.h @@ -15,6 +15,7 @@ #include "rocksdb/options.h" #include "util/coding.h" +#include "util/compression_context_cache.h" #ifdef SNAPPY #include @@ -38,14 +39,215 @@ #if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+ #include #endif // ZSTD_VERSION_NUMBER >= 800 +namespace rocksdb { +// Need this for the context allocation override +// On windows we need to do this explicitly +#if (ZSTD_VERSION_NUMBER >= 500) +#if defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \ + defined(ZSTD_STATIC_LINKING_ONLY) +#define ROCKSDB_ZSTD_CUSTOM_MEM +namespace port { +ZSTD_customMem GetJeZstdAllocationOverrides(); +} // namespace port +#endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && + // defined(ZSTD_STATIC_LINKING_ONLY) + +// Cached data represents a portion that can be re-used +// If, in the future we have more than one native context to +// cache we can arrange this as a tuple +class ZSTDUncompressCachedData { +public: + using ZSTDNativeContext = ZSTD_DCtx*; + ZSTDUncompressCachedData() {} + // Init from cache + ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete; + ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; + ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT : + ZSTDUncompressCachedData() { + *this = std::move(o); + } + ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT { + assert(zstd_ctx_ == nullptr); + std::swap(zstd_ctx_,o.zstd_ctx_); + std::swap(cache_idx_,o.cache_idx_); + return *this; + } + ZSTDNativeContext Get() const { + return zstd_ctx_; + } + int64_t GetCacheIndex() const { + return cache_idx_; + } + void CreateIfNeeded() { + if (zstd_ctx_ == nullptr) { +#ifdef ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides()); +#else // ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createDCtx(); +#endif // ROCKSDB_ZSTD_CUSTOM_MEM + cache_idx_ = -1; + } + } + void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) { + zstd_ctx_ = o.zstd_ctx_; + cache_idx_ = idx; + } + ~ZSTDUncompressCachedData() { + if (zstd_ctx_ != nullptr && cache_idx_ == -1) { + ZSTD_freeDCtx(zstd_ctx_); + } + } +private: + ZSTDNativeContext zstd_ctx_ = nullptr; + int64_t cache_idx_ = -1; // -1 means this instance owns the context +}; +#endif // (ZSTD_VERSION_NUMBER >= 500) +} // namespace rocksdb #endif // ZSTD +#if !(defined ZSTD) || !(ZSTD_VERSION_NUMBER >= 500) +namespace rocksdb { +class ZSTDUncompressCachedData { + void* padding; // unused +public: + using ZSTDNativeContext = void*; + ZSTDUncompressCachedData() {} + ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {} + ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; + ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default; + ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default; + ZSTDNativeContext Get() const { + return nullptr; + } + int64_t GetCacheIndex() const { + return -1; + } + void CreateIfNeeded() {} + void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {} +}; +} // namespace rocksdb +#endif + #if defined(XPRESS) #include "port/xpress.h" #endif namespace rocksdb { +// Instantiate this class and pass it to the uncompression API below +class CompressionContext { +private: + const CompressionType type_; + const CompressionOptions opts_; + Slice dict_; +#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500) + ZSTD_CCtx* zstd_ctx_ = nullptr; + void CreateNativeContext() { + if (type_ == kZSTD) { +#ifdef ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides()); +#else // ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createCCtx(); +#endif // ROCKSDB_ZSTD_CUSTOM_MEM + } + } + void DestroyNativeContext() { + if (zstd_ctx_ != nullptr) { + ZSTD_freeCCtx(zstd_ctx_); + } + } +public: + // callable inside ZSTD_Compress + ZSTD_CCtx * ZSTDPreallocCtx() const { + assert(type_ == kZSTD); + return zstd_ctx_; + } +#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500) +private: + void CreateNativeContext() {} + void DestroyNativeContext() {} +#endif //ZSTD && (ZSTD_VERSION_NUMBER >= 500) +public: + explicit CompressionContext(CompressionType comp_type) : + type_(comp_type) { + CreateNativeContext(); + } + CompressionContext(CompressionType comp_type, + const CompressionOptions& opts, + const Slice& comp_dict = Slice()) : + type_(comp_type), + opts_(opts), + dict_(comp_dict) { + CreateNativeContext(); + } + ~CompressionContext() { + DestroyNativeContext(); + } + CompressionContext(const CompressionContext&) = delete; + CompressionContext& operator=(const CompressionContext&) = delete; + + const CompressionOptions& options() const { + return opts_; + } + CompressionType type() const { + return type_; + } + const Slice& dict() const { + return dict_; + } + Slice& dict() { + return dict_; + } +}; + +// Instantiate this class and pass it to the uncompression API below +class UncompressionContext { +private: + CompressionType type_; + Slice dict_; + CompressionContextCache* ctx_cache_ = nullptr; + ZSTDUncompressCachedData uncomp_cached_data_; +public: + struct NoCache {}; + // Do not use context cache, used by TableBuilder + UncompressionContext(NoCache, CompressionType comp_type) : + type_(comp_type) { + } + explicit UncompressionContext(CompressionType comp_type) : + UncompressionContext(comp_type, Slice()) { + } + UncompressionContext(CompressionType comp_type, const Slice& comp_dict) : + type_(comp_type), dict_(comp_dict) { + if (type_ == kZSTD) { + ctx_cache_ = CompressionContextCache::Instance(); + uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); + } + } + ~UncompressionContext() { + if (type_ == kZSTD && + uncomp_cached_data_.GetCacheIndex() != -1) { + assert(ctx_cache_ != nullptr); + ctx_cache_->ReturnCachedZSTDUncompressData( + uncomp_cached_data_.GetCacheIndex()); + } + } + UncompressionContext(const UncompressionContext&) = delete; + UncompressionContext& operator=(const UncompressionContext&) = delete; + + ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { + return uncomp_cached_data_.Get(); + } + CompressionType type() const { + return type_; + } + const Slice& dict() const { + return dict_; + } + Slice& dict() { + return dict_; + } +}; + inline bool Snappy_Supported() { #ifdef SNAPPY return true; @@ -162,7 +364,7 @@ inline std::string CompressionTypeToString(CompressionType compression_type) { // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the // start of compressed block. Snappy format is the same as version 1. -inline bool Snappy_Compress(const CompressionOptions& /*opts*/, +inline bool Snappy_Compress(const CompressionContext& /*ctx*/, const char* input, size_t length, ::std::string* output) { #ifdef SNAPPY @@ -229,10 +431,9 @@ inline bool GetDecompressedSizeInfo(const char** input_data, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool Zlib_Compress(const CompressionOptions& opts, +inline bool Zlib_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output, - const Slice& compression_dict = Slice()) { + size_t length, ::std::string* output) { #ifdef ZLIB if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -255,24 +456,24 @@ inline bool Zlib_Compress(const CompressionOptions& opts, // The default value is 8. See zconf.h for more details. static const int memLevel = 8; int level; - if (opts.level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { level = Z_DEFAULT_COMPRESSION; } else { - level = opts.level; + level = ctx.options().level; } z_stream _stream; memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, level, Z_DEFLATED, opts.window_bits, - memLevel, opts.strategy); + int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits, + memLevel, ctx.options().strategy); if (st != Z_OK) { return false; } - if (compression_dict.size()) { + if (ctx.dict().size()) { // Initialize the compression library's dictionary st = deflateSetDictionary( - &_stream, reinterpret_cast(compression_dict.data()), - static_cast(compression_dict.size())); + &_stream, reinterpret_cast(ctx.dict().data()), + static_cast(ctx.dict().size())); if (st != Z_OK) { deflateEnd(&_stream); return false; @@ -300,12 +501,11 @@ inline bool Zlib_Compress(const CompressionOptions& opts, deflateEnd(&_stream); return compressed; #else - (void)opts; + (void)ctx; (void)compress_format_version; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } @@ -316,10 +516,10 @@ inline bool Zlib_Compress(const CompressionOptions& opts, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* Zlib_Uncompress(const char* input_data, size_t input_length, +inline char* Zlib_Uncompress(const UncompressionContext& ctx, const char* input_data, + size_t input_length, int* decompress_size, uint32_t compress_format_version, - const Slice& compression_dict = Slice(), int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; @@ -349,11 +549,11 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, return nullptr; } - if (compression_dict.size()) { + if (ctx.dict().size()) { // Initialize the compression library's dictionary st = inflateSetDictionary( - &_stream, reinterpret_cast(compression_dict.data()), - static_cast(compression_dict.size())); + &_stream, reinterpret_cast(ctx.dict().data()), + static_cast(ctx.dict().size())); if (st != Z_OK) { return nullptr; } @@ -406,11 +606,11 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, inflateEnd(&_stream); return output; #else + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; (void)compress_format_version; - (void)compression_dict; (void)windowBits; return nullptr; #endif @@ -420,7 +620,7 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline bool BZip2_Compress(const CompressionOptions& /*opts*/, +inline bool BZip2_Compress(const CompressionContext& /*ctx*/, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef BZIP2 @@ -567,10 +767,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4_Compress(const CompressionOptions& /*opts*/, +inline bool LZ4_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output, - const Slice compression_dict = Slice()) { + size_t length, ::std::string* output) { #ifdef LZ4 if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -596,9 +795,9 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/, int outlen; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_stream_t* stream = LZ4_createStream(); - if (compression_dict.size()) { - LZ4_loadDict(stream, compression_dict.data(), - static_cast(compression_dict.size())); + if (ctx.dict().size()) { + LZ4_loadDict(stream, ctx.dict().data(), + static_cast(ctx.dict().size())); } #if LZ4_VERSION_NUMBER >= 10700 // r129+ outlen = @@ -621,11 +820,11 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 + (void)ctx; (void)compress_format_version; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } @@ -636,10 +835,10 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* LZ4_Uncompress(const char* input_data, size_t input_length, +inline char* LZ4_Uncompress(const UncompressionContext& ctx, const char* input_data, + size_t input_length, int* decompress_size, - uint32_t compress_format_version, - const Slice& compression_dict = Slice()) { + uint32_t compress_format_version) { #ifdef LZ4 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -662,9 +861,9 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, char* output = new char[output_len]; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); - if (compression_dict.size()) { - LZ4_setStreamDecode(stream, compression_dict.data(), - static_cast(compression_dict.size())); + if (ctx.dict().size()) { + LZ4_setStreamDecode(stream, ctx.dict().data(), + static_cast(ctx.dict().size())); } *decompress_size = LZ4_decompress_safe_continue( stream, input_data, output, static_cast(input_length), @@ -683,11 +882,11 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, assert(*decompress_size == static_cast(output_len)); return output; #else // LZ4 + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; (void)compress_format_version; - (void)compression_dict; return nullptr; #endif } @@ -698,10 +897,9 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4HC_Compress(const CompressionOptions& opts, +inline bool LZ4HC_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output, - const Slice& compression_dict = Slice()) { + size_t length, ::std::string* output) { #ifdef LZ4 if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -726,17 +924,17 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts, int outlen; int level; - if (opts.level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { level = 0; // lz4hc.h says any value < 1 will be sanitized to default } else { - level = opts.level; + level = ctx.options().level; } #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamHC_t* stream = LZ4_createStreamHC(); LZ4_resetStreamHC(stream, level); const char* compression_dict_data = - compression_dict.size() > 0 ? compression_dict.data() : nullptr; - size_t compression_dict_size = compression_dict.size(); + ctx.dict().size() > 0 ? ctx.dict().data() : nullptr; + size_t compression_dict_size = ctx.dict().size(); LZ4_loadDictHC(stream, compression_dict_data, static_cast(compression_dict_size)); @@ -767,12 +965,11 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 - (void)opts; + (void)ctx; (void)compress_format_version; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } @@ -804,9 +1001,8 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/, // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, - size_t length, ::std::string* output, - const Slice& compression_dict = Slice()) { +inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, + size_t length, ::std::string* output) { #ifdef ZSTD if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -818,21 +1014,21 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, size_t compressBound = ZSTD_compressBound(length); output->resize(static_cast(output_header_len + compressBound)); - size_t outlen; + size_t outlen = 0; int level; - if (opts.level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see // https://github.com/facebook/zstd/issues/1148 level = 3; } else { - level = opts.level; + level = ctx.options().level; } #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_CCtx* context = ZSTD_createCCtx(); + ZSTD_CCtx* context = ctx.ZSTDPreallocCtx(); + assert(context != nullptr); outlen = ZSTD_compress_usingDict( context, &(*output)[output_header_len], compressBound, input, length, - compression_dict.data(), compression_dict.size(), level); - ZSTD_freeCCtx(context); + ctx.dict().data(), ctx.dict().size(), level); #else // up to v0.4.x outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, length, level); @@ -843,20 +1039,19 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, output->resize(output_header_len + outlen); return true; #else // ZSTD - (void)opts; + (void)ctx; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, - int* decompress_size, - const Slice& compression_dict = Slice()) { +inline char* ZSTD_Uncompress(const UncompressionContext& ctx, const char* input_data, + size_t input_length, + int* decompress_size) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -867,11 +1062,11 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, char* output = new char[output_len]; size_t actual_output_length; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_DCtx* context = ZSTD_createDCtx(); + ZSTD_DCtx* context = ctx.GetZSTDContext(); + assert(context != nullptr); actual_output_length = ZSTD_decompress_usingDict( context, output, output_len, input_data, input_length, - compression_dict.data(), compression_dict.size()); - ZSTD_freeDCtx(context); + ctx.dict().data(), ctx.dict().size()); #else // up to v0.4.x actual_output_length = ZSTD_decompress(output, output_len, input_data, input_length); @@ -880,10 +1075,10 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, *decompress_size = static_cast(actual_output_length); return output; #else // ZSTD + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; - (void)compression_dict; return nullptr; #endif } diff --git a/util/compression_context_cache.cc b/util/compression_context_cache.cc new file mode 100644 index 000000000..cd34eb1de --- /dev/null +++ b/util/compression_context_cache.cc @@ -0,0 +1,111 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +#include "util/compression_context_cache.h" + +#include "util/compression.h" +#include "util/core_local.h" + +#include + +namespace rocksdb { +namespace compression_cache { + +void* const SentinelValue = nullptr; +// Cache ZSTD uncompression contexts for reads +// if needed we can add ZSTD compression context caching +// which is currently is not done since BlockBasedTableBuilder +// simply creates one compression context per new SST file. +struct ZSTDCachedData { + // We choose to cache the below structure instead of a ptr + // because we want to avoid a) native types leak b) make + // cache use transparent for the user + ZSTDUncompressCachedData uncomp_cached_data_; + std::atomic zstd_uncomp_sentinel_; + + char padding[(CACHE_LINE_SIZE - + (sizeof(ZSTDUncompressCachedData) + + sizeof(std::atomic)) % + CACHE_LINE_SIZE)]; // unused padding field + + ZSTDCachedData() : zstd_uncomp_sentinel_(&uncomp_cached_data_) {} + ZSTDCachedData(const ZSTDCachedData&) = delete; + ZSTDCachedData& operator=(const ZSTDCachedData&) = delete; + + ZSTDUncompressCachedData GetUncompressData(int64_t idx) { + ZSTDUncompressCachedData result; + void* expected = &uncomp_cached_data_; + if (zstd_uncomp_sentinel_.compare_exchange_strong(expected, SentinelValue)) { + uncomp_cached_data_.CreateIfNeeded(); + result.InitFromCache(uncomp_cached_data_, idx); + } else { + // Creates one time use data + result.CreateIfNeeded(); + } + return result; + } + // Return the entry back into circulation + // This is executed only when we successfully obtained + // in the first place + void ReturnUncompressData() { + if (zstd_uncomp_sentinel_.exchange(&uncomp_cached_data_) != SentinelValue) { + // Means we are returning while not having it acquired. + assert(false); + } + } +}; +static_assert(sizeof(ZSTDCachedData) % CACHE_LINE_SIZE == 0, "Expected CACHE_LINE_SIZE alignment"); +} // compression_cache + +using namespace compression_cache; + +class CompressionContextCache::Rep { +public: + Rep() { + } + ZSTDUncompressCachedData GetZSTDUncompressData() { + auto p = per_core_uncompr_.AccessElementAndIndex(); + int64_t idx = static_cast(p.second); + return p.first->GetUncompressData(idx); + } + void ReturnZSTDUncompressData(int64_t idx) { + assert(idx >= 0); + auto* cn = per_core_uncompr_.AccessAtCore(static_cast(idx)); + cn->ReturnUncompressData(); + } +private: + CoreLocalArray per_core_uncompr_; +}; + +CompressionContextCache::CompressionContextCache() : + rep_(new Rep()) { +} + +CompressionContextCache* CompressionContextCache::Instance() { + static CompressionContextCache instance; + return &instance; +} + +void CompressionContextCache::InitSingleton() { + Instance(); +} + +ZSTDUncompressCachedData CompressionContextCache::GetCachedZSTDUncompressData() { + return rep_->GetZSTDUncompressData(); +} + +void CompressionContextCache::ReturnCachedZSTDUncompressData(int64_t idx) { + rep_->ReturnZSTDUncompressData(idx); +} + +CompressionContextCache::~CompressionContextCache() { + delete rep_; +} + +} diff --git a/util/compression_context_cache.h b/util/compression_context_cache.h new file mode 100644 index 000000000..2b37cfbd8 --- /dev/null +++ b/util/compression_context_cache.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +// Compression context cache allows to cache compression/uncompression contexts +// This helps with Random Read latencies and reduces CPU utilization +// Caching is implemented using CoreLocal facility. Compression/Uncompression +// instances are cached on a per core basis using CoreLocalArray. A borrowed +// instance is atomically replaced with a sentinel value for the time of being used. +// If it turns out that another thread is already makes use of the instance we still +// create one on the heap which is later is destroyed. + +#pragma once + +#include + +namespace rocksdb { +class ZSTDUncompressCachedData; + +class CompressionContextCache { +public: + // Singleton + static CompressionContextCache* Instance(); + static void InitSingleton(); + CompressionContextCache(const CompressionContextCache&) = delete; + CompressionContextCache& operator=(const CompressionContextCache&) = delete; + + ZSTDUncompressCachedData GetCachedZSTDUncompressData(); + void ReturnCachedZSTDUncompressData(int64_t idx); + +private: + // Singleton + CompressionContextCache(); + ~CompressionContextCache(); + + class Rep; + Rep* rep_; +}; + +} diff --git a/util/thread_local.h b/util/thread_local.h index 175f1cca4..5dad72921 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -45,6 +45,9 @@ class ThreadLocalPtr { public: explicit ThreadLocalPtr(UnrefHandler handler = nullptr); + ThreadLocalPtr(const ThreadLocalPtr&) = delete; + ThreadLocalPtr& operator=(const ThreadLocalPtr&) = delete; + ~ThreadLocalPtr(); // Return the current pointer stored in thread local diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index a4a1162bd..d2857f3a1 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -773,9 +773,9 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, } StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); CompressionType ct = bdb_options_.compression; - CompressionOptions compression_opts; - CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat, - Slice(), compression_output); + CompressionContext compression_ctx(ct); + CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat, + compression_output); return *compression_output; } @@ -1120,9 +1120,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, { StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS); + UncompressionContext uncompression_ctx(bfile->compression()); s = UncompressBlockContentsForCompressionType( + uncompression_ctx, blob_value.data(), blob_value.size(), &contents, - kBlockBasedTableVersionFormat, Slice(), bfile->compression(), + kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); } value->PinSelf(contents.data); diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index c527bc9d5..aa45ebee6 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -205,9 +205,11 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, if (compression != kNoCompression && (show_uncompressed_blob != DisplayType::kNone || show_summary)) { BlockContents contents; + UncompressionContext uncompression_ctx(compression); s = UncompressBlockContentsForCompressionType( + uncompression_ctx, slice.data() + key_size, value_size, &contents, - 2 /*compress_format_version*/, Slice(), compression, + 2 /*compress_format_version*/, ImmutableCFOptions(Options())); if (!s.ok()) { return s; diff --git a/utilities/column_aware_encoding_util.cc b/utilities/column_aware_encoding_util.cc index 6d15e77ad..1dffd32ab 100644 --- a/utilities/column_aware_encoding_util.cc +++ b/utilities/column_aware_encoding_util.cc @@ -84,16 +84,16 @@ void ColumnAwareEncodingReader::DecodeBlocks( auto& slice_final_with_bit = block; uint32_t format_version = 2; - Slice compression_dict; BlockContents contents; const char* content_ptr; CompressionType type = (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; if (type != kNoCompression) { - UncompressBlockContents(slice_final_with_bit.c_str(), + UncompressionContext uncompression_ctx(type); + UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(), slice_final_with_bit.size() - 1, &contents, - format_version, compression_dict, ioptions); + format_version, ioptions); content_ptr = contents.data.data(); } else { content_ptr = slice_final_with_bit.data(); @@ -166,16 +166,17 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat( for (auto& block : *blocks) { auto& slice_final_with_bit = block; uint32_t format_version = 2; - Slice compression_dict; BlockContents contents; std::string decoded_content; CompressionType type = (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; if (type != kNoCompression) { - UncompressBlockContents(slice_final_with_bit.c_str(), + UncompressionContext uncompression_ctx(type); + UncompressBlockContents(uncompression_ctx, + slice_final_with_bit.c_str(), slice_final_with_bit.size() - 1, &contents, - format_version, compression_dict, ioptions); + format_version, ioptions); decoded_content = std::string(contents.data.data(), contents.data.size()); } else { decoded_content = std::move(slice_final_with_bit); @@ -244,12 +245,11 @@ namespace { void CompressDataBlock(const std::string& output_content, Slice* slice_final, CompressionType* type, std::string* compressed_output) { - CompressionOptions compression_opts; + CompressionContext compression_ctx(*type); uint32_t format_version = 2; // hard-coded version - Slice compression_dict; *slice_final = - CompressBlock(output_content, compression_opts, type, format_version, - compression_dict, compressed_output); + CompressBlock(output_content, compression_ctx, type, format_version, + compressed_output); } } // namespace