diff --git a/CMakeLists.txt b/CMakeLists.txt index cadb0a8cf..6fedf455a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -322,14 +322,24 @@ if(DEFINED USE_RTTI) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -DROCKSDB_USE_RTTI") else() - message(STATUS "Disabling RTTI") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti") - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + if(MSVC) + message(STATUS "Disabling RTTI in Release builds. Always on in Debug.") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-") + else() + message(STATUS "Disabling RTTI in Release builds") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-rtti") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + endif() endif() else() message(STATUS "Enabling RTTI in Debug builds only (default)") set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -DROCKSDB_USE_RTTI") - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + if(MSVC) + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} /GR-") + else() + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -fno-rtti") + endif() endif() if(MSVC) @@ -574,6 +584,7 @@ set(SOURCES util/coding.cc util/compaction_job_stats_impl.cc util/comparator.cc + util/compression_context_cache.cc util/concurrent_arena.cc util/crc32c.cc util/delete_scheduler.cc diff --git a/TARGETS b/TARGETS index d130f512b..3e8a2b78d 100644 --- a/TARGETS +++ b/TARGETS @@ -200,6 +200,7 @@ cpp_library( "util/coding.cc", "util/compaction_job_stats_impl.cc", "util/comparator.cc", + "util/compression_context_cache.cc", "util/concurrent_arena.cc", "util/crc32c.cc", "util/delete_scheduler.cc", diff --git a/port/win/env_default.cc b/port/win/env_default.cc index 52a984f74..b99cf9e94 100644 --- a/port/win/env_default.cc +++ b/port/win/env_default.cc @@ -11,16 +11,13 @@ #include #include "port/win/env_win.h" +#include "util/compression_context_cache.h" +#include "util/thread_local.h" namespace rocksdb { namespace port { -// We choose to create this on the heap and using std::once for the following -// reasons -// 1) Currently available MS compiler does not implement atomic C++11 -// initialization of -// function local statics -// 2) We choose not to destroy the env because joining the threads from the +// We choose not to destroy the env because joining the threads from the // system loader // which destroys the statics (same as from DLLMain) creates a system loader // dead-lock. @@ -29,11 +26,12 @@ namespace { std::once_flag winenv_once_flag; Env* envptr; }; - } Env* Env::Default() { using namespace port; + ThreadLocalPtr::InitSingletons(); + CompressionContextCache::InitSingleton(); std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); return envptr; } diff --git a/port/win/win_jemalloc.cc b/port/win/win_jemalloc.cc index fc46e189c..4f84c0734 100644 --- a/port/win/win_jemalloc.cc +++ b/port/win/win_jemalloc.cc @@ -14,6 +14,25 @@ #include #include "jemalloc/jemalloc.h" +#if defined(ZSTD) && defined(ZSTD_STATIC_LINKING_ONLY) +#include +#if (ZSTD_VERSION_NUMBER >= 500) +namespace rocksdb { +namespace port { +void* JemallocAllocateForZSTD(void* /* opaque */, size_t size) { + return je_malloc(size); +} +void JemallocDeallocateForZSTD(void* /* opaque */, void* address) { + je_free(address); +} +ZSTD_customMem GetJeZstdAllocationOverrides() { + return { JemallocAllocateForZSTD, JemallocDeallocateForZSTD, nullptr }; +} +} // namespace port +} // namespace rocksdb +#endif // (ZSTD_VERSION_NUMBER >= 500) +#endif // defined(ZSTD) defined(ZSTD_STATIC_LINKING_ONLY) + // Global operators to be replaced by a linker when this file is // a part of the build diff --git a/src.mk b/src.mk index 48958784d..047eaa3f2 100644 --- a/src.mk +++ b/src.mk @@ -129,6 +129,7 @@ LIB_SOURCES = \ util/coding.cc \ util/compaction_job_stats_impl.cc \ util/comparator.cc \ + util/compression_context_cache.cc \ util/concurrent_arena.cc \ util/crc32c.cc \ util/delete_scheduler.cc \ diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index a742f6327..43c1e999c 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -104,19 +104,19 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { // format_version is the block format as defined in include/rocksdb/table.h Slice CompressBlock(const Slice& raw, - const CompressionOptions& compression_options, + const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, - const Slice& compression_dict, std::string* compressed_output) { - if (*type == kNoCompression) { + *type = compression_ctx.type(); + if (compression_ctx.type() == kNoCompression) { return raw; } // Will return compressed block contents if (1) the compression method is // supported in this platform and (2) the compression rate is "good enough". - switch (*type) { + switch (compression_ctx.type()) { case kSnappyCompression: - if (Snappy_Compress(compression_options, raw.data(), raw.size(), + if (Snappy_Compress(compression_ctx, raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; @@ -124,16 +124,16 @@ Slice CompressBlock(const Slice& raw, break; // fall back to no compression. case kZlibCompression: if (Zlib_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kZlibCompression, format_version), - raw.data(), raw.size(), compressed_output, compression_dict) && + raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kBZip2Compression: if (BZip2_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kBZip2Compression, format_version), raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { @@ -142,18 +142,18 @@ Slice CompressBlock(const Slice& raw, break; // fall back to no compression. case kLZ4Compression: if (LZ4_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kLZ4Compression, format_version), - raw.data(), raw.size(), compressed_output, compression_dict) && + raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } break; // fall back to no compression. case kLZ4HCCompression: if (LZ4HC_Compress( - compression_options, + compression_ctx, GetCompressFormatForVersion(kLZ4HCCompression, format_version), - raw.data(), raw.size(), compressed_output, compression_dict) && + raw.data(), raw.size(), compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -167,8 +167,8 @@ Slice CompressBlock(const Slice& raw, break; case kZSTD: case kZSTDNotFinalCompression: - if (ZSTD_Compress(compression_options, raw.data(), raw.size(), - compressed_output, compression_dict) && + if (ZSTD_Compress(compression_ctx, raw.data(), raw.size(), + compressed_output) && GoodCompressionRatio(compressed_output->size(), raw.size())) { return *compressed_output; } @@ -261,10 +261,10 @@ struct BlockBasedTableBuilder::Rep { PartitionedIndexBuilder* p_index_builder_ = nullptr; std::string last_key; - const CompressionType compression_type; - const CompressionOptions compression_opts; - // Data for presetting the compression library's dictionary, or nullptr. - const std::string* compression_dict; + // Compression dictionary or nullptr + const std::string* compression_dict; + CompressionContext compression_ctx; + std::unique_ptr verify_ctx; TableProperties props; bool closed = false; // Either Finish() or Abandon() has been called. @@ -306,9 +306,8 @@ struct BlockBasedTableBuilder::Rep { table_options.use_delta_encoding), range_del_block(1 /* block_restart_interval */), internal_prefix_transform(_moptions.prefix_extractor.get()), - compression_type(_compression_type), - compression_opts(_compression_opts), compression_dict(_compression_dict), + compression_ctx(_compression_type, _compression_opts), compressed_cache_key_prefix_size(0), flush_block_policy( table_options.flush_block_policy_factory->NewFlushBlockPolicy( @@ -342,6 +341,16 @@ struct BlockBasedTableBuilder::Rep { new BlockBasedTablePropertiesCollector( table_options.index_type, table_options.whole_key_filtering, _moptions.prefix_extractor != nullptr)); + if (table_options.verify_compression) { + verify_ctx.reset(new UncompressionContext(UncompressionContext::NoCache(), + compression_ctx.type())); + } + } + + Rep(const Rep&) = delete; + Rep& operator=(const Rep&) = delete; + + ~Rep() { } }; @@ -480,7 +489,7 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, assert(ok()); Rep* r = rep_; - auto type = r->compression_type; + auto type = r->compression_ctx.type(); Slice block_contents; bool abort_compression = false; @@ -490,12 +499,23 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, if (raw_block_contents.size() < kCompressionSizeLimit) { Slice compression_dict; if (is_data_block && r->compression_dict && r->compression_dict->size()) { - compression_dict = *r->compression_dict; + r->compression_ctx.dict() = *r->compression_dict; + if (r->table_options.verify_compression) { + assert(r->verify_ctx != nullptr); + r->verify_ctx->dict() = *r->compression_dict; + } + } else { + // Clear dictionary + r->compression_ctx.dict() = Slice(); + if (r->table_options.verify_compression) { + assert(r->verify_ctx != nullptr); + r->verify_ctx->dict() = Slice(); + } } - block_contents = CompressBlock(raw_block_contents, r->compression_opts, - &type, r->table_options.format_version, - compression_dict, &r->compressed_output); + block_contents = CompressBlock(raw_block_contents, r->compression_ctx, + &type, r->table_options.format_version, + &r->compressed_output); // Some of the compression algorithms are known to be unreliable. If // the verify_compression flag is set then try to de-compress the @@ -503,9 +523,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents, if (type != kNoCompression && r->table_options.verify_compression) { // Retrieve the uncompressed contents into a new buffer BlockContents contents; - Status stat = UncompressBlockContentsForCompressionType( + Status stat = UncompressBlockContentsForCompressionType(*r->verify_ctx, block_contents.data(), block_contents.size(), &contents, - r->table_options.format_version, compression_dict, type, + r->table_options.format_version, r->ioptions); if (stat.ok()) { @@ -739,7 +759,7 @@ Status BlockBasedTableBuilder::Finish() { r->props.merge_operator_name = r->ioptions.merge_operator != nullptr ? r->ioptions.merge_operator->Name() : "nullptr"; - r->props.compression_name = CompressionTypeToString(r->compression_type); + r->props.compression_name = CompressionTypeToString(r->compression_ctx.type()); r->props.prefix_extractor_name = r->moptions.prefix_extractor != nullptr ? r->moptions.prefix_extractor->Name() diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index a0ba87f14..42dc953f0 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -19,6 +19,7 @@ #include "rocksdb/options.h" #include "rocksdb/status.h" #include "table/table_builder.h" +#include "util/compression.h" namespace rocksdb { @@ -53,6 +54,10 @@ class BlockBasedTableBuilder : public TableBuilder { // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); + // No copying allowed + BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete; + BlockBasedTableBuilder& operator=(const BlockBasedTableBuilder&) = delete; + // Add key,value to the table being constructed. // REQUIRES: key is after any previously added key according to comparator. // REQUIRES: Finish(), Abandon() have not been called @@ -115,16 +120,11 @@ class BlockBasedTableBuilder : public TableBuilder { // Some compression libraries fail when the raw size is bigger than int. If // uncompressed size is bigger than kCompressionSizeLimit, don't compress it const uint64_t kCompressionSizeLimit = std::numeric_limits::max(); - - // No copying allowed - BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete; - void operator=(const BlockBasedTableBuilder&) = delete; }; Slice CompressBlock(const Slice& raw, - const CompressionOptions& compression_options, + const CompressionContext& compression_ctx, CompressionType* type, uint32_t format_version, - const Slice& compression_dict, std::string* compressed_output); } // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 2462b880f..d384981e7 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1104,9 +1104,12 @@ Status BlockBasedTable::GetDataBlockFromCache( // Retrieve the uncompressed contents into a new buffer BlockContents contents; - s = UncompressBlockContents(compressed_block->data(), + UncompressionContext uncompresssion_ctx(compressed_block->compression_type(), + compression_dict); + s = UncompressBlockContents(uncompresssion_ctx, + compressed_block->data(), compressed_block->size(), &contents, - format_version, compression_dict, + format_version, ioptions); // Insert uncompressed block into block cache @@ -1182,8 +1185,10 @@ Status BlockBasedTable::PutDataBlockToCache( BlockContents contents; Statistics* statistics = ioptions.statistics; if (raw_block->compression_type() != kNoCompression) { - s = UncompressBlockContents(raw_block->data(), raw_block->size(), &contents, - format_version, compression_dict, ioptions); + UncompressionContext uncompression_ctx(raw_block->compression_type(), compression_dict); + s = UncompressBlockContents(uncompression_ctx, raw_block->data(), + raw_block->size(), &contents, + format_version, ioptions); } if (!s.ok()) { delete raw_block; diff --git a/table/block_fetcher.cc b/table/block_fetcher.cc index fd01ac1c6..6a2f5f995 100644 --- a/table/block_fetcher.cc +++ b/table/block_fetcher.cc @@ -225,8 +225,10 @@ Status BlockFetcher::ReadBlockContents() { if (do_uncompress_ && compression_type != kNoCompression) { // compressed page, uncompress, update cache - status_ = UncompressBlockContents(slice_.data(), block_size_, contents_, - footer_.version(), compression_dict_, + UncompressionContext uncompression_ctx(compression_type, compression_dict_); + status_ = UncompressBlockContents(uncompression_ctx, + slice_.data(), block_size_, contents_, + footer_.version(), ioptions_); } else { GetBlockContents(); diff --git a/table/format.cc b/table/format.cc index 534d8c466..3a8f80e34 100644 --- a/table/format.cc +++ b/table/format.cc @@ -264,18 +264,17 @@ Status ReadFooterFromFile(RandomAccessFileReader* file, return Status::OK(); } -Status UncompressBlockContentsForCompressionType( +Status UncompressBlockContentsForCompressionType(const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, - uint32_t format_version, const Slice& compression_dict, - CompressionType compression_type, const ImmutableCFOptions &ioptions) { + uint32_t format_version, const ImmutableCFOptions &ioptions) { std::unique_ptr ubuf; - assert(compression_type != kNoCompression && "Invalid compression type"); + assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); StopWatchNano timer(ioptions.env, ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); int decompress_size = 0; - switch (compression_type) { + switch (uncompression_ctx.type()) { case kSnappyCompression: { size_t ulength = 0; static char snappy_corrupt_msg[] = @@ -291,10 +290,9 @@ Status UncompressBlockContentsForCompressionType( break; } case kZlibCompression: - ubuf.reset(Zlib_Uncompress( + ubuf.reset(Zlib_Uncompress(uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kZlibCompression, format_version), - compression_dict)); + GetCompressFormatForVersion(kZlibCompression, format_version))); if (!ubuf) { static char zlib_corrupt_msg[] = "Zlib not supported or corrupted Zlib compressed block contents"; @@ -316,10 +314,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4Compression: - ubuf.reset(LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress(uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4Compression, format_version), - compression_dict)); + GetCompressFormatForVersion(kLZ4Compression, format_version))); if (!ubuf) { static char lz4_corrupt_msg[] = "LZ4 not supported or corrupted LZ4 compressed block contents"; @@ -329,10 +326,9 @@ Status UncompressBlockContentsForCompressionType( BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); break; case kLZ4HCCompression: - ubuf.reset(LZ4_Uncompress( + ubuf.reset(LZ4_Uncompress(uncompression_ctx, data, n, &decompress_size, - GetCompressFormatForVersion(kLZ4HCCompression, format_version), - compression_dict)); + GetCompressFormatForVersion(kLZ4HCCompression, format_version))); if (!ubuf) { static char lz4hc_corrupt_msg[] = "LZ4HC not supported or corrupted LZ4HC compressed block contents"; @@ -353,7 +349,7 @@ Status UncompressBlockContentsForCompressionType( break; case kZSTD: case kZSTDNotFinalCompression: - ubuf.reset(ZSTD_Uncompress(data, n, &decompress_size, compression_dict)); + ubuf.reset(ZSTD_Uncompress(uncompression_ctx, data, n, &decompress_size)); if (!ubuf) { static char zstd_corrupt_msg[] = "ZSTD not supported or corrupted ZSTD compressed block contents"; @@ -383,14 +379,15 @@ Status UncompressBlockContentsForCompressionType( // buffer is returned via 'result' and it is upto the caller to // free this buffer. // format_version is the block format as defined in include/rocksdb/table.h -Status UncompressBlockContents(const char* data, size_t n, +Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, + const char* data, size_t n, BlockContents* contents, uint32_t format_version, - const Slice& compression_dict, const ImmutableCFOptions &ioptions) { assert(data[n] != kNoCompression); + assert(data[n] == uncompression_ctx.type()); return UncompressBlockContentsForCompressionType( - data, n, contents, format_version, compression_dict, - (CompressionType)data[n], ioptions); + uncompression_ctx, data, n, contents, + format_version, ioptions); } } // namespace rocksdb diff --git a/table/format.h b/table/format.h index f3737fc21..15f167be0 100644 --- a/table/format.h +++ b/table/format.h @@ -228,19 +228,20 @@ extern Status ReadBlockContents( // free this buffer. // For description of compress_format_version and possible values, see // util/compression.h -extern Status UncompressBlockContents(const char* data, size_t n, +extern Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, + const char* data, size_t n, BlockContents* contents, uint32_t compress_format_version, - const Slice& compression_dict, - const ImmutableCFOptions& ioptions); + const ImmutableCFOptions &ioptions); // This is an extension to UncompressBlockContents that accepts // a specific compression type. This is used by un-wrapped blocks // with no compression header. extern Status UncompressBlockContentsForCompressionType( + const UncompressionContext& uncompression_ctx, const char* data, size_t n, BlockContents* contents, - uint32_t compress_format_version, const Slice& compression_dict, - CompressionType compression_type, const ImmutableCFOptions& ioptions); + uint32_t compress_format_version, + const ImmutableCFOptions& ioptions); // Implementation details follow. Clients should ignore, diff --git a/thirdparty.inc b/thirdparty.inc index 26c50d99f..f40b81fec 100644 --- a/thirdparty.inc +++ b/thirdparty.inc @@ -12,9 +12,9 @@ set (THIRDPARTY_LIBS "") # Initialization, don't touch # Defaults # set(GFLAGS_HOME $ENV{THIRDPARTY_HOME}/Gflags.Library) -set(GFLAGS_INCLUDE ${GFLAGS_HOME}/inc/include) -set(GFLAGS_LIB_DEBUG ${GFLAGS_HOME}/bin/debug/amd64/gflags.lib) -set(GFLAGS_LIB_RELEASE ${GFLAGS_HOME}/bin/retail/amd64/gflags.lib) +set(GFLAGS_INCLUDE ${GFLAGS_HOME}/build/native/include) +set(GFLAGS_LIB_DEBUG ${GFLAGS_HOME}/lib/native/debug/amd64/gflags.lib) +set(GFLAGS_LIB_RELEASE ${GFLAGS_HOME}/lib/native/retail/amd64/gflags.lib) # ================================================== GFLAGS ================================================== # For compatibility @@ -52,9 +52,9 @@ endif () # Edit these 4 lines to define paths to Snappy # set(SNAPPY_HOME $ENV{THIRDPARTY_HOME}/Snappy.Library) -set(SNAPPY_INCLUDE ${SNAPPY_HOME}/inc/inc) -set(SNAPPY_LIB_DEBUG ${SNAPPY_HOME}/bin/debug/amd64/snappy.lib) -set(SNAPPY_LIB_RELEASE ${SNAPPY_HOME}/bin/retail/amd64/snappy.lib) +set(SNAPPY_INCLUDE ${SNAPPY_HOME}/build/native/inc/inc) +set(SNAPPY_LIB_DEBUG ${SNAPPY_HOME}/lib/native/debug/amd64/snappy.lib) +set(SNAPPY_LIB_RELEASE ${SNAPPY_HOME}/lib/native/retail/amd64/snappy.lib) # For compatibility if(SNAPPY) @@ -63,11 +63,11 @@ endif () if (WITH_SNAPPY) message(STATUS "SNAPPY library is enabled") - + if(DEFINED ENV{SNAPPY_INCLUDE}) set(SNAPPY_INCLUDE $ENV{SNAPPY_INCLUDE}) endif() - + if(DEFINED ENV{SNAPPY_LIB_DEBUG}) set(SNAPPY_LIB_DEBUG $ENV{SNAPPY_LIB_DEBUG}) endif() @@ -75,7 +75,7 @@ if (WITH_SNAPPY) if(DEFINED ENV{SNAPPY_LIB_RELEASE}) set(SNAPPY_LIB_RELEASE $ENV{SNAPPY_LIB_RELEASE}) endif() - + set(SNAPPY_CXX_FLAGS -DSNAPPY) set(SNAPPY_LIBS debug ${SNAPPY_LIB_DEBUG} optimized ${SNAPPY_LIB_RELEASE}) @@ -91,14 +91,13 @@ endif () # Edit these 4 lines to define paths to LZ4 # set(LZ4_HOME $ENV{THIRDPARTY_HOME}/LZ4.Library) -set(LZ4_INCLUDE ${LZ4_HOME}/inc/include) -set(LZ4_LIB_DEBUG ${LZ4_HOME}/bin/debug/amd64/lz4.lib) -set(LZ4_LIB_RELEASE ${LZ4_HOME}/bin/retail/amd64/lz4.lib) +set(LZ4_INCLUDE ${LZ4_HOME}/build/native/inc/inc) +set(LZ4_LIB_DEBUG ${LZ4_HOME}/lib/native/debug/amd64/lz4.lib) +set(LZ4_LIB_RELEASE ${LZ4_HOME}/lib/native/retail/amd64/lz4.lib) -# -# Don't touch these lines -# -if (DEFINED LZ4) + +# For compatibility +if (LZ4) set(WITH_LZ4 ON) endif () @@ -132,13 +131,9 @@ endif () # Edit these 4 lines to define paths to ZLIB # set(ZLIB_HOME $ENV{THIRDPARTY_HOME}/ZLIB.Library) -set(ZLIB_INCLUDE ${ZLIB_HOME}/inc/include) -set(ZLIB_LIB_DEBUG ${ZLIB_HOME}/bin/debug/amd64/zlib.lib) -set(ZLIB_LIB_RELEASE ${ZLIB_HOME}/bin/retail/amd64/zlib.lib) - -# -# Don't touch these lines -# +set(ZLIB_INCLUDE ${ZLIB_HOME}/build/native/inc/inc) +set(ZLIB_LIB_DEBUG ${ZLIB_HOME}/lib/native/debug/amd64/zlib.lib) +set(ZLIB_LIB_RELEASE ${ZLIB_HOME}/lib/native/retail/amd64/zlib.lib) # For compatibilty if (ZLIB) @@ -170,6 +165,9 @@ else () message(STATUS "ZLIB library is disabled") endif () +# ================================================== XPRESS ================================================== +# This makes use of built-in Windows API, no additional includes, links to a system lib + # For compatibilty if (XPRESS) set(WITH_XPRESS ON) @@ -186,20 +184,56 @@ else () message(STATUS "XPRESS is disabled") endif () + +# ================================================== ZSTD ================================================== # -# Edit these 4 lines to define paths to Jemalloc +# Edit these 4 lines to define paths to ZSTD # -set(JEMALLOC_HOME $ENV{THIRDPARTY_HOME}/Jemalloc.Library) -set(JEMALLOC_INCLUDE ${JEMALLOC_HOME}/inc/include) -set(JEMALLOC_LIB_DEBUG ${JEMALLOC_HOME}/bin/debug/amd64/jemalloc.lib) -set(JEMALLOC_LIB_RELEASE ${JEMALLOC_HOME}/bin/retail/amd64/jemalloc.lib) +set(ZSTD_HOME $ENV{THIRDPARTY_HOME}/ZSTD.Library) +set(ZSTD_INCLUDE ${ZSTD_HOME}/build/native/inc) +set(ZSTD_LIB_DEBUG ${ZSTD_HOME}/lib/native/debug/amd64/libzstd_static.lib) +set(ZSTD_LIB_RELEASE ${ZSTD_HOME}/lib/native/retail/amd64/libzstd_static.lib) + +# For compatibility +if (ZSTD) + set(WITH_ZSTD ON) +endif () + +if (WITH_ZSTD) + message(STATUS "ZSTD library is enabled") + + if(DEFINED ENV{ZSTD_INCLUDE}) + set(ZSTD_INCLUDE $ENV{ZSTD_INCLUDE}) + endif() + + if(DEFINED ENV{ZSTD_LIB_DEBUG}) + set(ZSTD_LIB_DEBUG $ENV{ZSTD_LIB_DEBUG}) + endif() + + if(DEFINED ENV{ZSTD_LIB_RELEASE}) + set(ZSTD_LIB_RELEASE $ENV{ZSTD_LIB_RELEASE}) + endif() + + # ZSTD_STATIC_LINKING_ONLY only allows us to create an allocation functions override + # When jemalloc is in use + set(ZSTD_LIBS debug ${ZSTD_LIB_DEBUG} optimized ${ZSTD_LIB_RELEASE}) + + add_definitions(-DZSTD -DZSTD_STATIC_LINKING_ONLY) + include_directories(${ZSTD_INCLUDE}) + set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${ZSTD_LIBS}) +else () + message(STATUS "ZSTD library is disabled") +endif () -# ================================================== JEMALLOC ================================================== # -# Don't touch these lines +# Edit these 4 lines to define paths to Jemalloc # +set(JEMALLOC_HOME $ENV{THIRDPARTY_HOME}/Jemalloc.Library) +set(JEMALLOC_INCLUDE ${JEMALLOC_HOME}/build/native/inc) +set(JEMALLOC_LIB_DEBUG ${JEMALLOC_HOME}/lib/native/debug/amd64/jemalloc.lib) +set(JEMALLOC_LIB_RELEASE ${JEMALLOC_HOME}/lib/native/retail/amd64/jemalloc.lib) -# For compatibilty +# ================================================== JEMALLOC ================================================== if(JEMALLOC) set(WITH_JEMALLOC ON) endif() @@ -226,9 +260,7 @@ if (WITH_JEMALLOC) include_directories(${JEMALLOC_INCLUDE}) set (THIRDPARTY_LIBS ${THIRDPARTY_LIBS} ${JEMALLOC_LIBS}) set (ARTIFACT_SUFFIX "_je") - - set(WITH_JEMALLOC ON) - + else () set (ARTIFACT_SUFFIX "") message(STATUS "JEMALLOC library is disabled") diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 79a06b4fe..a992562a3 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -1941,27 +1941,28 @@ class Benchmark { return true; } - inline bool CompressSlice(const Slice& input, std::string* compressed) { + inline bool CompressSlice(const CompressionContext& compression_ctx, + const Slice& input, std::string* compressed) { bool ok = true; switch (FLAGS_compression_type_e) { case rocksdb::kSnappyCompression: - ok = Snappy_Compress(Options().compression_opts, input.data(), + ok = Snappy_Compress(compression_ctx, input.data(), input.size(), compressed); break; case rocksdb::kZlibCompression: - ok = Zlib_Compress(Options().compression_opts, 2, input.data(), + ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kBZip2Compression: - ok = BZip2_Compress(Options().compression_opts, 2, input.data(), + ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4Compression: - ok = LZ4_Compress(Options().compression_opts, 2, input.data(), + ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kLZ4HCCompression: - ok = LZ4HC_Compress(Options().compression_opts, 2, input.data(), + ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(), compressed); break; case rocksdb::kXpressCompression: @@ -1969,8 +1970,8 @@ class Benchmark { input.size(), compressed); break; case rocksdb::kZSTD: - ok = ZSTD_Compress(Options().compression_opts, input.data(), - input.size(), compressed); + ok = ZSTD_Compress(compression_ctx, input.data(), + input.size(), compressed); break; default: ok = false; @@ -2052,7 +2053,8 @@ class Benchmark { const int len = FLAGS_block_size; std::string input_str(len, 'y'); std::string compressed; - bool result = CompressSlice(Slice(input_str), &compressed); + CompressionContext compression_ctx(FLAGS_compression_type_e, Options().compression_opts); + bool result = CompressSlice(compression_ctx, Slice(input_str), &compressed); if (!result) { fprintf(stdout, "WARNING: %s compression is not enabled\n", @@ -2194,10 +2196,11 @@ class Benchmark { merge_keys_(FLAGS_merge_keys < 0 ? FLAGS_num : FLAGS_merge_keys), report_file_operations_(FLAGS_report_file_operations), #ifndef ROCKSDB_LITE - use_blob_db_(FLAGS_use_blob_db) { + use_blob_db_(FLAGS_use_blob_db) #else - use_blob_db_(false) { + use_blob_db_(false) #endif // !ROCKSDB_LITE + { // use simcache instead of cache if (FLAGS_simcache_size >= 0) { if (FLAGS_cache_numshardbits >= 1) { @@ -2843,11 +2846,13 @@ void VerifyDBFromDB(std::string& truth_db_name) { int64_t produced = 0; bool ok = true; std::string compressed; + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); // Compress 1G while (ok && bytes < int64_t(1) << 30) { compressed.clear(); - ok = CompressSlice(input, &compressed); + ok = CompressSlice(compression_ctx, input, &compressed); produced += compressed.size(); bytes += input.size(); thread->stats.FinishedOps(nullptr, nullptr, 1, kCompress); @@ -2869,7 +2874,11 @@ void VerifyDBFromDB(std::string& truth_db_name) { Slice input = gen.Generate(FLAGS_block_size); std::string compressed; - bool ok = CompressSlice(input, &compressed); + UncompressionContext uncompression_ctx(FLAGS_compression_type_e); + CompressionContext compression_ctx(FLAGS_compression_type_e, + Options().compression_opts); + + bool ok = CompressSlice(compression_ctx, input, &compressed); int64_t bytes = 0; int decompress_size; while (ok && bytes < 1024 * 1048576) { @@ -2889,7 +2898,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { break; } case rocksdb::kZlibCompression: - uncompressed = Zlib_Uncompress(compressed.data(), compressed.size(), + uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; @@ -2899,12 +2909,14 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed != nullptr; break; case rocksdb::kLZ4Compression: - uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), + uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; case rocksdb::kLZ4HCCompression: - uncompressed = LZ4_Uncompress(compressed.data(), compressed.size(), + uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size, 2); ok = uncompressed != nullptr; break; @@ -2914,7 +2926,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { ok = uncompressed != nullptr; break; case rocksdb::kZSTD: - uncompressed = ZSTD_Uncompress(compressed.data(), compressed.size(), + uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), + compressed.size(), &decompress_size); ok = uncompressed != nullptr; break; diff --git a/util/compression.h b/util/compression.h index 69df6695f..ef57ee11b 100644 --- a/util/compression.h +++ b/util/compression.h @@ -15,6 +15,7 @@ #include "rocksdb/options.h" #include "util/coding.h" +#include "util/compression_context_cache.h" #ifdef SNAPPY #include @@ -38,14 +39,215 @@ #if ZSTD_VERSION_NUMBER >= 800 // v0.8.0+ #include #endif // ZSTD_VERSION_NUMBER >= 800 +namespace rocksdb { +// Need this for the context allocation override +// On windows we need to do this explicitly +#if (ZSTD_VERSION_NUMBER >= 500) +#if defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && \ + defined(ZSTD_STATIC_LINKING_ONLY) +#define ROCKSDB_ZSTD_CUSTOM_MEM +namespace port { +ZSTD_customMem GetJeZstdAllocationOverrides(); +} // namespace port +#endif // defined(ROCKSDB_JEMALLOC) && defined(OS_WIN) && + // defined(ZSTD_STATIC_LINKING_ONLY) + +// Cached data represents a portion that can be re-used +// If, in the future we have more than one native context to +// cache we can arrange this as a tuple +class ZSTDUncompressCachedData { +public: + using ZSTDNativeContext = ZSTD_DCtx*; + ZSTDUncompressCachedData() {} + // Init from cache + ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete; + ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; + ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT : + ZSTDUncompressCachedData() { + *this = std::move(o); + } + ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT { + assert(zstd_ctx_ == nullptr); + std::swap(zstd_ctx_,o.zstd_ctx_); + std::swap(cache_idx_,o.cache_idx_); + return *this; + } + ZSTDNativeContext Get() const { + return zstd_ctx_; + } + int64_t GetCacheIndex() const { + return cache_idx_; + } + void CreateIfNeeded() { + if (zstd_ctx_ == nullptr) { +#ifdef ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides()); +#else // ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createDCtx(); +#endif // ROCKSDB_ZSTD_CUSTOM_MEM + cache_idx_ = -1; + } + } + void InitFromCache(const ZSTDUncompressCachedData& o, int64_t idx) { + zstd_ctx_ = o.zstd_ctx_; + cache_idx_ = idx; + } + ~ZSTDUncompressCachedData() { + if (zstd_ctx_ != nullptr && cache_idx_ == -1) { + ZSTD_freeDCtx(zstd_ctx_); + } + } +private: + ZSTDNativeContext zstd_ctx_ = nullptr; + int64_t cache_idx_ = -1; // -1 means this instance owns the context +}; +#endif // (ZSTD_VERSION_NUMBER >= 500) +} // namespace rocksdb #endif // ZSTD +#if !(defined ZSTD) || !(ZSTD_VERSION_NUMBER >= 500) +namespace rocksdb { +class ZSTDUncompressCachedData { + void* padding; // unused +public: + using ZSTDNativeContext = void*; + ZSTDUncompressCachedData() {} + ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {} + ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; + ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default; + ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default; + ZSTDNativeContext Get() const { + return nullptr; + } + int64_t GetCacheIndex() const { + return -1; + } + void CreateIfNeeded() {} + void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {} +}; +} // namespace rocksdb +#endif + #if defined(XPRESS) #include "port/xpress.h" #endif namespace rocksdb { +// Instantiate this class and pass it to the uncompression API below +class CompressionContext { +private: + const CompressionType type_; + const CompressionOptions opts_; + Slice dict_; +#if defined(ZSTD) && (ZSTD_VERSION_NUMBER >= 500) + ZSTD_CCtx* zstd_ctx_ = nullptr; + void CreateNativeContext() { + if (type_ == kZSTD) { +#ifdef ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides()); +#else // ROCKSDB_ZSTD_CUSTOM_MEM + zstd_ctx_ = ZSTD_createCCtx(); +#endif // ROCKSDB_ZSTD_CUSTOM_MEM + } + } + void DestroyNativeContext() { + if (zstd_ctx_ != nullptr) { + ZSTD_freeCCtx(zstd_ctx_); + } + } +public: + // callable inside ZSTD_Compress + ZSTD_CCtx * ZSTDPreallocCtx() const { + assert(type_ == kZSTD); + return zstd_ctx_; + } +#else // ZSTD && (ZSTD_VERSION_NUMBER >= 500) +private: + void CreateNativeContext() {} + void DestroyNativeContext() {} +#endif //ZSTD && (ZSTD_VERSION_NUMBER >= 500) +public: + explicit CompressionContext(CompressionType comp_type) : + type_(comp_type) { + CreateNativeContext(); + } + CompressionContext(CompressionType comp_type, + const CompressionOptions& opts, + const Slice& comp_dict = Slice()) : + type_(comp_type), + opts_(opts), + dict_(comp_dict) { + CreateNativeContext(); + } + ~CompressionContext() { + DestroyNativeContext(); + } + CompressionContext(const CompressionContext&) = delete; + CompressionContext& operator=(const CompressionContext&) = delete; + + const CompressionOptions& options() const { + return opts_; + } + CompressionType type() const { + return type_; + } + const Slice& dict() const { + return dict_; + } + Slice& dict() { + return dict_; + } +}; + +// Instantiate this class and pass it to the uncompression API below +class UncompressionContext { +private: + CompressionType type_; + Slice dict_; + CompressionContextCache* ctx_cache_ = nullptr; + ZSTDUncompressCachedData uncomp_cached_data_; +public: + struct NoCache {}; + // Do not use context cache, used by TableBuilder + UncompressionContext(NoCache, CompressionType comp_type) : + type_(comp_type) { + } + explicit UncompressionContext(CompressionType comp_type) : + UncompressionContext(comp_type, Slice()) { + } + UncompressionContext(CompressionType comp_type, const Slice& comp_dict) : + type_(comp_type), dict_(comp_dict) { + if (type_ == kZSTD) { + ctx_cache_ = CompressionContextCache::Instance(); + uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); + } + } + ~UncompressionContext() { + if (type_ == kZSTD && + uncomp_cached_data_.GetCacheIndex() != -1) { + assert(ctx_cache_ != nullptr); + ctx_cache_->ReturnCachedZSTDUncompressData( + uncomp_cached_data_.GetCacheIndex()); + } + } + UncompressionContext(const UncompressionContext&) = delete; + UncompressionContext& operator=(const UncompressionContext&) = delete; + + ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { + return uncomp_cached_data_.Get(); + } + CompressionType type() const { + return type_; + } + const Slice& dict() const { + return dict_; + } + Slice& dict() { + return dict_; + } +}; + inline bool Snappy_Supported() { #ifdef SNAPPY return true; @@ -162,7 +364,7 @@ inline std::string CompressionTypeToString(CompressionType compression_type) { // 2 -- Zlib, BZip2 and LZ4 encode decompressed size as Varint32 just before the // start of compressed block. Snappy format is the same as version 1. -inline bool Snappy_Compress(const CompressionOptions& /*opts*/, +inline bool Snappy_Compress(const CompressionContext& /*ctx*/, const char* input, size_t length, ::std::string* output) { #ifdef SNAPPY @@ -229,10 +431,9 @@ inline bool GetDecompressedSizeInfo(const char** input_data, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool Zlib_Compress(const CompressionOptions& opts, +inline bool Zlib_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output, - const Slice& compression_dict = Slice()) { + size_t length, ::std::string* output) { #ifdef ZLIB if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -255,24 +456,24 @@ inline bool Zlib_Compress(const CompressionOptions& opts, // The default value is 8. See zconf.h for more details. static const int memLevel = 8; int level; - if (opts.level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { level = Z_DEFAULT_COMPRESSION; } else { - level = opts.level; + level = ctx.options().level; } z_stream _stream; memset(&_stream, 0, sizeof(z_stream)); - int st = deflateInit2(&_stream, level, Z_DEFLATED, opts.window_bits, - memLevel, opts.strategy); + int st = deflateInit2(&_stream, level, Z_DEFLATED, ctx.options().window_bits, + memLevel, ctx.options().strategy); if (st != Z_OK) { return false; } - if (compression_dict.size()) { + if (ctx.dict().size()) { // Initialize the compression library's dictionary st = deflateSetDictionary( - &_stream, reinterpret_cast(compression_dict.data()), - static_cast(compression_dict.size())); + &_stream, reinterpret_cast(ctx.dict().data()), + static_cast(ctx.dict().size())); if (st != Z_OK) { deflateEnd(&_stream); return false; @@ -300,12 +501,11 @@ inline bool Zlib_Compress(const CompressionOptions& opts, deflateEnd(&_stream); return compressed; #else - (void)opts; + (void)ctx; (void)compress_format_version; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } @@ -316,10 +516,10 @@ inline bool Zlib_Compress(const CompressionOptions& opts, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* Zlib_Uncompress(const char* input_data, size_t input_length, +inline char* Zlib_Uncompress(const UncompressionContext& ctx, const char* input_data, + size_t input_length, int* decompress_size, uint32_t compress_format_version, - const Slice& compression_dict = Slice(), int windowBits = -14) { #ifdef ZLIB uint32_t output_len = 0; @@ -349,11 +549,11 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, return nullptr; } - if (compression_dict.size()) { + if (ctx.dict().size()) { // Initialize the compression library's dictionary st = inflateSetDictionary( - &_stream, reinterpret_cast(compression_dict.data()), - static_cast(compression_dict.size())); + &_stream, reinterpret_cast(ctx.dict().data()), + static_cast(ctx.dict().size())); if (st != Z_OK) { return nullptr; } @@ -406,11 +606,11 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, inflateEnd(&_stream); return output; #else + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; (void)compress_format_version; - (void)compression_dict; (void)windowBits; return nullptr; #endif @@ -420,7 +620,7 @@ inline char* Zlib_Uncompress(const char* input_data, size_t input_length, // block header // compress_format_version == 2 -- decompressed size is included in the block // header in varint32 format -inline bool BZip2_Compress(const CompressionOptions& /*opts*/, +inline bool BZip2_Compress(const CompressionContext& /*ctx*/, uint32_t compress_format_version, const char* input, size_t length, ::std::string* output) { #ifdef BZIP2 @@ -567,10 +767,9 @@ inline char* BZip2_Uncompress(const char* input_data, size_t input_length, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4_Compress(const CompressionOptions& /*opts*/, +inline bool LZ4_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output, - const Slice compression_dict = Slice()) { + size_t length, ::std::string* output) { #ifdef LZ4 if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -596,9 +795,9 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/, int outlen; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_stream_t* stream = LZ4_createStream(); - if (compression_dict.size()) { - LZ4_loadDict(stream, compression_dict.data(), - static_cast(compression_dict.size())); + if (ctx.dict().size()) { + LZ4_loadDict(stream, ctx.dict().data(), + static_cast(ctx.dict().size())); } #if LZ4_VERSION_NUMBER >= 10700 // r129+ outlen = @@ -621,11 +820,11 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 + (void)ctx; (void)compress_format_version; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } @@ -636,10 +835,10 @@ inline bool LZ4_Compress(const CompressionOptions& /*opts*/, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* LZ4_Uncompress(const char* input_data, size_t input_length, +inline char* LZ4_Uncompress(const UncompressionContext& ctx, const char* input_data, + size_t input_length, int* decompress_size, - uint32_t compress_format_version, - const Slice& compression_dict = Slice()) { + uint32_t compress_format_version) { #ifdef LZ4 uint32_t output_len = 0; if (compress_format_version == 2) { @@ -662,9 +861,9 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, char* output = new char[output_len]; #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamDecode_t* stream = LZ4_createStreamDecode(); - if (compression_dict.size()) { - LZ4_setStreamDecode(stream, compression_dict.data(), - static_cast(compression_dict.size())); + if (ctx.dict().size()) { + LZ4_setStreamDecode(stream, ctx.dict().data(), + static_cast(ctx.dict().size())); } *decompress_size = LZ4_decompress_safe_continue( stream, input_data, output, static_cast(input_length), @@ -683,11 +882,11 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, assert(*decompress_size == static_cast(output_len)); return output; #else // LZ4 + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; (void)compress_format_version; - (void)compression_dict; return nullptr; #endif } @@ -698,10 +897,9 @@ inline char* LZ4_Uncompress(const char* input_data, size_t input_length, // header in varint32 format // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool LZ4HC_Compress(const CompressionOptions& opts, +inline bool LZ4HC_Compress(const CompressionContext& ctx, uint32_t compress_format_version, const char* input, - size_t length, ::std::string* output, - const Slice& compression_dict = Slice()) { + size_t length, ::std::string* output) { #ifdef LZ4 if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -726,17 +924,17 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts, int outlen; int level; - if (opts.level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { level = 0; // lz4hc.h says any value < 1 will be sanitized to default } else { - level = opts.level; + level = ctx.options().level; } #if LZ4_VERSION_NUMBER >= 10400 // r124+ LZ4_streamHC_t* stream = LZ4_createStreamHC(); LZ4_resetStreamHC(stream, level); const char* compression_dict_data = - compression_dict.size() > 0 ? compression_dict.data() : nullptr; - size_t compression_dict_size = compression_dict.size(); + ctx.dict().size() > 0 ? ctx.dict().data() : nullptr; + size_t compression_dict_size = ctx.dict().size(); LZ4_loadDictHC(stream, compression_dict_data, static_cast(compression_dict_size)); @@ -767,12 +965,11 @@ inline bool LZ4HC_Compress(const CompressionOptions& opts, output->resize(static_cast(output_header_len + outlen)); return true; #else // LZ4 - (void)opts; + (void)ctx; (void)compress_format_version; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } @@ -804,9 +1001,8 @@ inline char* XPRESS_Uncompress(const char* /*input_data*/, // @param compression_dict Data for presetting the compression library's // dictionary. -inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, - size_t length, ::std::string* output, - const Slice& compression_dict = Slice()) { +inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input, + size_t length, ::std::string* output) { #ifdef ZSTD if (length > std::numeric_limits::max()) { // Can't compress more than 4GB @@ -818,21 +1014,21 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, size_t compressBound = ZSTD_compressBound(length); output->resize(static_cast(output_header_len + compressBound)); - size_t outlen; + size_t outlen = 0; int level; - if (opts.level == CompressionOptions::kDefaultCompressionLevel) { + if (ctx.options().level == CompressionOptions::kDefaultCompressionLevel) { // 3 is the value of ZSTD_CLEVEL_DEFAULT (not exposed publicly), see // https://github.com/facebook/zstd/issues/1148 level = 3; } else { - level = opts.level; + level = ctx.options().level; } #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_CCtx* context = ZSTD_createCCtx(); + ZSTD_CCtx* context = ctx.ZSTDPreallocCtx(); + assert(context != nullptr); outlen = ZSTD_compress_usingDict( context, &(*output)[output_header_len], compressBound, input, length, - compression_dict.data(), compression_dict.size(), level); - ZSTD_freeCCtx(context); + ctx.dict().data(), ctx.dict().size(), level); #else // up to v0.4.x outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, length, level); @@ -843,20 +1039,19 @@ inline bool ZSTD_Compress(const CompressionOptions& opts, const char* input, output->resize(output_header_len + outlen); return true; #else // ZSTD - (void)opts; + (void)ctx; (void)input; (void)length; (void)output; - (void)compression_dict; return false; #endif } // @param compression_dict Data for presetting the compression library's // dictionary. -inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, - int* decompress_size, - const Slice& compression_dict = Slice()) { +inline char* ZSTD_Uncompress(const UncompressionContext& ctx, const char* input_data, + size_t input_length, + int* decompress_size) { #ifdef ZSTD uint32_t output_len = 0; if (!compression::GetDecompressedSizeInfo(&input_data, &input_length, @@ -867,11 +1062,11 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, char* output = new char[output_len]; size_t actual_output_length; #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ - ZSTD_DCtx* context = ZSTD_createDCtx(); + ZSTD_DCtx* context = ctx.GetZSTDContext(); + assert(context != nullptr); actual_output_length = ZSTD_decompress_usingDict( context, output, output_len, input_data, input_length, - compression_dict.data(), compression_dict.size()); - ZSTD_freeDCtx(context); + ctx.dict().data(), ctx.dict().size()); #else // up to v0.4.x actual_output_length = ZSTD_decompress(output, output_len, input_data, input_length); @@ -880,10 +1075,10 @@ inline char* ZSTD_Uncompress(const char* input_data, size_t input_length, *decompress_size = static_cast(actual_output_length); return output; #else // ZSTD + (void)ctx; (void)input_data; (void)input_length; (void)decompress_size; - (void)compression_dict; return nullptr; #endif } diff --git a/util/compression_context_cache.cc b/util/compression_context_cache.cc new file mode 100644 index 000000000..cd34eb1de --- /dev/null +++ b/util/compression_context_cache.cc @@ -0,0 +1,111 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +#include "util/compression_context_cache.h" + +#include "util/compression.h" +#include "util/core_local.h" + +#include + +namespace rocksdb { +namespace compression_cache { + +void* const SentinelValue = nullptr; +// Cache ZSTD uncompression contexts for reads +// if needed we can add ZSTD compression context caching +// which is currently is not done since BlockBasedTableBuilder +// simply creates one compression context per new SST file. +struct ZSTDCachedData { + // We choose to cache the below structure instead of a ptr + // because we want to avoid a) native types leak b) make + // cache use transparent for the user + ZSTDUncompressCachedData uncomp_cached_data_; + std::atomic zstd_uncomp_sentinel_; + + char padding[(CACHE_LINE_SIZE - + (sizeof(ZSTDUncompressCachedData) + + sizeof(std::atomic)) % + CACHE_LINE_SIZE)]; // unused padding field + + ZSTDCachedData() : zstd_uncomp_sentinel_(&uncomp_cached_data_) {} + ZSTDCachedData(const ZSTDCachedData&) = delete; + ZSTDCachedData& operator=(const ZSTDCachedData&) = delete; + + ZSTDUncompressCachedData GetUncompressData(int64_t idx) { + ZSTDUncompressCachedData result; + void* expected = &uncomp_cached_data_; + if (zstd_uncomp_sentinel_.compare_exchange_strong(expected, SentinelValue)) { + uncomp_cached_data_.CreateIfNeeded(); + result.InitFromCache(uncomp_cached_data_, idx); + } else { + // Creates one time use data + result.CreateIfNeeded(); + } + return result; + } + // Return the entry back into circulation + // This is executed only when we successfully obtained + // in the first place + void ReturnUncompressData() { + if (zstd_uncomp_sentinel_.exchange(&uncomp_cached_data_) != SentinelValue) { + // Means we are returning while not having it acquired. + assert(false); + } + } +}; +static_assert(sizeof(ZSTDCachedData) % CACHE_LINE_SIZE == 0, "Expected CACHE_LINE_SIZE alignment"); +} // compression_cache + +using namespace compression_cache; + +class CompressionContextCache::Rep { +public: + Rep() { + } + ZSTDUncompressCachedData GetZSTDUncompressData() { + auto p = per_core_uncompr_.AccessElementAndIndex(); + int64_t idx = static_cast(p.second); + return p.first->GetUncompressData(idx); + } + void ReturnZSTDUncompressData(int64_t idx) { + assert(idx >= 0); + auto* cn = per_core_uncompr_.AccessAtCore(static_cast(idx)); + cn->ReturnUncompressData(); + } +private: + CoreLocalArray per_core_uncompr_; +}; + +CompressionContextCache::CompressionContextCache() : + rep_(new Rep()) { +} + +CompressionContextCache* CompressionContextCache::Instance() { + static CompressionContextCache instance; + return &instance; +} + +void CompressionContextCache::InitSingleton() { + Instance(); +} + +ZSTDUncompressCachedData CompressionContextCache::GetCachedZSTDUncompressData() { + return rep_->GetZSTDUncompressData(); +} + +void CompressionContextCache::ReturnCachedZSTDUncompressData(int64_t idx) { + rep_->ReturnZSTDUncompressData(idx); +} + +CompressionContextCache::~CompressionContextCache() { + delete rep_; +} + +} diff --git a/util/compression_context_cache.h b/util/compression_context_cache.h new file mode 100644 index 000000000..2b37cfbd8 --- /dev/null +++ b/util/compression_context_cache.h @@ -0,0 +1,45 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// + +// Compression context cache allows to cache compression/uncompression contexts +// This helps with Random Read latencies and reduces CPU utilization +// Caching is implemented using CoreLocal facility. Compression/Uncompression +// instances are cached on a per core basis using CoreLocalArray. A borrowed +// instance is atomically replaced with a sentinel value for the time of being used. +// If it turns out that another thread is already makes use of the instance we still +// create one on the heap which is later is destroyed. + +#pragma once + +#include + +namespace rocksdb { +class ZSTDUncompressCachedData; + +class CompressionContextCache { +public: + // Singleton + static CompressionContextCache* Instance(); + static void InitSingleton(); + CompressionContextCache(const CompressionContextCache&) = delete; + CompressionContextCache& operator=(const CompressionContextCache&) = delete; + + ZSTDUncompressCachedData GetCachedZSTDUncompressData(); + void ReturnCachedZSTDUncompressData(int64_t idx); + +private: + // Singleton + CompressionContextCache(); + ~CompressionContextCache(); + + class Rep; + Rep* rep_; +}; + +} diff --git a/util/thread_local.h b/util/thread_local.h index 175f1cca4..5dad72921 100644 --- a/util/thread_local.h +++ b/util/thread_local.h @@ -45,6 +45,9 @@ class ThreadLocalPtr { public: explicit ThreadLocalPtr(UnrefHandler handler = nullptr); + ThreadLocalPtr(const ThreadLocalPtr&) = delete; + ThreadLocalPtr& operator=(const ThreadLocalPtr&) = delete; + ~ThreadLocalPtr(); // Return the current pointer stored in thread local diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc index a4a1162bd..d2857f3a1 100644 --- a/utilities/blob_db/blob_db_impl.cc +++ b/utilities/blob_db/blob_db_impl.cc @@ -773,9 +773,9 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, } StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); CompressionType ct = bdb_options_.compression; - CompressionOptions compression_opts; - CompressBlock(raw, compression_opts, &ct, kBlockBasedTableVersionFormat, - Slice(), compression_output); + CompressionContext compression_ctx(ct); + CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat, + compression_output); return *compression_output; } @@ -1120,9 +1120,11 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, { StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS); + UncompressionContext uncompression_ctx(bfile->compression()); s = UncompressBlockContentsForCompressionType( + uncompression_ctx, blob_value.data(), blob_value.size(), &contents, - kBlockBasedTableVersionFormat, Slice(), bfile->compression(), + kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); } value->PinSelf(contents.data); diff --git a/utilities/blob_db/blob_dump_tool.cc b/utilities/blob_db/blob_dump_tool.cc index c527bc9d5..aa45ebee6 100644 --- a/utilities/blob_db/blob_dump_tool.cc +++ b/utilities/blob_db/blob_dump_tool.cc @@ -205,9 +205,11 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, if (compression != kNoCompression && (show_uncompressed_blob != DisplayType::kNone || show_summary)) { BlockContents contents; + UncompressionContext uncompression_ctx(compression); s = UncompressBlockContentsForCompressionType( + uncompression_ctx, slice.data() + key_size, value_size, &contents, - 2 /*compress_format_version*/, Slice(), compression, + 2 /*compress_format_version*/, ImmutableCFOptions(Options())); if (!s.ok()) { return s; diff --git a/utilities/column_aware_encoding_util.cc b/utilities/column_aware_encoding_util.cc index 6d15e77ad..1dffd32ab 100644 --- a/utilities/column_aware_encoding_util.cc +++ b/utilities/column_aware_encoding_util.cc @@ -84,16 +84,16 @@ void ColumnAwareEncodingReader::DecodeBlocks( auto& slice_final_with_bit = block; uint32_t format_version = 2; - Slice compression_dict; BlockContents contents; const char* content_ptr; CompressionType type = (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; if (type != kNoCompression) { - UncompressBlockContents(slice_final_with_bit.c_str(), + UncompressionContext uncompression_ctx(type); + UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(), slice_final_with_bit.size() - 1, &contents, - format_version, compression_dict, ioptions); + format_version, ioptions); content_ptr = contents.data.data(); } else { content_ptr = slice_final_with_bit.data(); @@ -166,16 +166,17 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat( for (auto& block : *blocks) { auto& slice_final_with_bit = block; uint32_t format_version = 2; - Slice compression_dict; BlockContents contents; std::string decoded_content; CompressionType type = (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; if (type != kNoCompression) { - UncompressBlockContents(slice_final_with_bit.c_str(), + UncompressionContext uncompression_ctx(type); + UncompressBlockContents(uncompression_ctx, + slice_final_with_bit.c_str(), slice_final_with_bit.size() - 1, &contents, - format_version, compression_dict, ioptions); + format_version, ioptions); decoded_content = std::string(contents.data.data(), contents.data.size()); } else { decoded_content = std::move(slice_final_with_bit); @@ -244,12 +245,11 @@ namespace { void CompressDataBlock(const std::string& output_content, Slice* slice_final, CompressionType* type, std::string* compressed_output) { - CompressionOptions compression_opts; + CompressionContext compression_ctx(*type); uint32_t format_version = 2; // hard-coded version - Slice compression_dict; *slice_final = - CompressBlock(output_content, compression_opts, type, format_version, - compression_dict, compressed_output); + CompressBlock(output_content, compression_ctx, type, format_version, + compressed_output); } } // namespace