run make format for PR 3838 (#3954)

Summary:
PR https://github.com/facebook/rocksdb/pull/3838 made some changes that triggers lint warnings.
Run `make format` to fix formatting as suggested by siying .
Also piggyback two changes:
1) fix singleton destruction order for windows and posix env
2) fix two clang warnings
Closes https://github.com/facebook/rocksdb/pull/3954

Differential Revision: D8272041

Pulled By: miasantreble

fbshipit-source-id: 7c4fd12bd17aac13534520de0c733328aa3c6c9f
main
Zhongyi Xie 7 years ago committed by Facebook Github Bot
parent 812c7371d3
commit f1592a06c2
  1. 1
      db/comparator_db_test.cc
  2. 2
      env/env_posix.cc
  3. 3
      port/win/env_default.cc
  4. 1
      port/win/win_jemalloc.cc
  5. 22
      table/block_based_table_builder.cc
  6. 3
      table/block_based_table_builder.h
  7. 13
      table/block_based_table_reader.cc
  8. 7
      table/block_fetcher.cc
  9. 25
      table/format.cc
  10. 12
      table/format.h
  11. 42
      tools/db_bench_tool.cc
  12. 3
      tools/db_stress.cc
  13. 126
      util/compression.h
  14. 35
      util/compression_context_cache.cc
  15. 8
      util/compression_context_cache.h
  16. 6
      utilities/blob_db/blob_db_impl.cc
  17. 6
      utilities/blob_db/blob_dump_tool.cc
  18. 8
      utilities/column_aware_encoding_util.cc

@ -260,7 +260,6 @@ class ComparatorDBTest
DB* db_; DB* db_;
Options last_options_; Options last_options_;
std::unique_ptr<const Comparator> comparator_guard; std::unique_ptr<const Comparator> comparator_guard;
uint32_t format_;
public: public:
ComparatorDBTest() : env_(Env::Default()), db_(nullptr) { ComparatorDBTest() : env_(Env::Default()), db_(nullptr) {

2
env/env_posix.cc vendored

@ -49,6 +49,7 @@
#include "rocksdb/options.h" #include "rocksdb/options.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/compression_context_cache.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/random.h" #include "util/random.h"
#include "util/string_util.h" #include "util/string_util.h"
@ -1057,6 +1058,7 @@ Env* Env::Default() {
// the destructor of static PosixEnv will go first, then the // the destructor of static PosixEnv will go first, then the
// the singletons of ThreadLocalPtr. // the singletons of ThreadLocalPtr.
ThreadLocalPtr::InitSingletons(); ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS(); INIT_SYNC_POINT_SINGLETONS();
static PosixEnv default_env; static PosixEnv default_env;
return &default_env; return &default_env;

@ -12,6 +12,7 @@
#include <rocksdb/env.h> #include <rocksdb/env.h>
#include "port/win/env_win.h" #include "port/win/env_win.h"
#include "util/compression_context_cache.h" #include "util/compression_context_cache.h"
#include "util/sync_point.h"
#include "util/thread_local.h" #include "util/thread_local.h"
namespace rocksdb { namespace rocksdb {
@ -32,9 +33,9 @@ Env* Env::Default() {
using namespace port; using namespace port;
ThreadLocalPtr::InitSingletons(); ThreadLocalPtr::InitSingletons();
CompressionContextCache::InitSingleton(); CompressionContextCache::InitSingleton();
INIT_SYNC_POINT_SINGLETONS();
std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); }); std::call_once(winenv_once_flag, []() { envptr = new WinEnv(); });
return envptr; return envptr;
} }
} }

@ -63,4 +63,3 @@ void operator delete[](void* p) {
je_free(p); je_free(p);
} }
} }

@ -103,8 +103,7 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
} // namespace } // namespace
// format_version is the block format as defined in include/rocksdb/table.h // format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw, Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version, CompressionType* type, uint32_t format_version,
std::string* compressed_output) { std::string* compressed_output) {
*type = compression_ctx.type(); *type = compression_ctx.type();
@ -350,8 +349,7 @@ struct BlockBasedTableBuilder::Rep {
Rep(const Rep&) = delete; Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete; Rep& operator=(const Rep&) = delete;
~Rep() { ~Rep() {}
}
}; };
BlockBasedTableBuilder::BlockBasedTableBuilder( BlockBasedTableBuilder::BlockBasedTableBuilder(
@ -513,9 +511,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
} }
} }
block_contents = CompressBlock(raw_block_contents, r->compression_ctx, block_contents =
&type, r->table_options.format_version, CompressBlock(raw_block_contents, r->compression_ctx, &type,
&r->compressed_output); r->table_options.format_version, &r->compressed_output);
// Some of the compression algorithms are known to be unreliable. If // Some of the compression algorithms are known to be unreliable. If
// the verify_compression flag is set then try to de-compress the // the verify_compression flag is set then try to de-compress the
@ -523,10 +521,9 @@ void BlockBasedTableBuilder::WriteBlock(const Slice& raw_block_contents,
if (type != kNoCompression && r->table_options.verify_compression) { if (type != kNoCompression && r->table_options.verify_compression) {
// Retrieve the uncompressed contents into a new buffer // Retrieve the uncompressed contents into a new buffer
BlockContents contents; BlockContents contents;
Status stat = UncompressBlockContentsForCompressionType(*r->verify_ctx, Status stat = UncompressBlockContentsForCompressionType(
block_contents.data(), block_contents.size(), &contents, *r->verify_ctx, block_contents.data(), block_contents.size(),
r->table_options.format_version, &contents, r->table_options.format_version, r->ioptions);
r->ioptions);
if (stat.ok()) { if (stat.ok()) {
bool compressed_ok = contents.data.compare(raw_block_contents) == 0; bool compressed_ok = contents.data.compare(raw_block_contents) == 0;
@ -759,7 +756,8 @@ Status BlockBasedTableBuilder::Finish() {
r->props.merge_operator_name = r->ioptions.merge_operator != nullptr r->props.merge_operator_name = r->ioptions.merge_operator != nullptr
? r->ioptions.merge_operator->Name() ? r->ioptions.merge_operator->Name()
: "nullptr"; : "nullptr";
r->props.compression_name = CompressionTypeToString(r->compression_ctx.type()); r->props.compression_name =
CompressionTypeToString(r->compression_ctx.type());
r->props.prefix_extractor_name = r->props.prefix_extractor_name =
r->moptions.prefix_extractor != nullptr r->moptions.prefix_extractor != nullptr
? r->moptions.prefix_extractor->Name() ? r->moptions.prefix_extractor->Name()

@ -122,8 +122,7 @@ class BlockBasedTableBuilder : public TableBuilder {
const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max(); const uint64_t kCompressionSizeLimit = std::numeric_limits<int>::max();
}; };
Slice CompressBlock(const Slice& raw, Slice CompressBlock(const Slice& raw, const CompressionContext& compression_ctx,
const CompressionContext& compression_ctx,
CompressionType* type, uint32_t format_version, CompressionType* type, uint32_t format_version,
std::string* compressed_output); std::string* compressed_output);

@ -1104,11 +1104,9 @@ Status BlockBasedTable::GetDataBlockFromCache(
BlockContents contents; BlockContents contents;
UncompressionContext uncompresssion_ctx(compressed_block->compression_type(), UncompressionContext uncompresssion_ctx(compressed_block->compression_type(),
compression_dict); compression_dict);
s = UncompressBlockContents(uncompresssion_ctx, s = UncompressBlockContents(uncompresssion_ctx, compressed_block->data(),
compressed_block->data(),
compressed_block->size(), &contents, compressed_block->size(), &contents,
format_version, format_version, ioptions);
ioptions);
// Insert uncompressed block into block cache // Insert uncompressed block into block cache
if (s.ok()) { if (s.ok()) {
@ -1183,10 +1181,11 @@ Status BlockBasedTable::PutDataBlockToCache(
BlockContents contents; BlockContents contents;
Statistics* statistics = ioptions.statistics; Statistics* statistics = ioptions.statistics;
if (raw_block->compression_type() != kNoCompression) { if (raw_block->compression_type() != kNoCompression) {
UncompressionContext uncompression_ctx(raw_block->compression_type(), compression_dict); UncompressionContext uncompression_ctx(raw_block->compression_type(),
compression_dict);
s = UncompressBlockContents(uncompression_ctx, raw_block->data(), s = UncompressBlockContents(uncompression_ctx, raw_block->data(),
raw_block->size(), &contents, raw_block->size(), &contents, format_version,
format_version, ioptions); ioptions);
} }
if (!s.ok()) { if (!s.ok()) {
delete raw_block; delete raw_block;

@ -226,10 +226,9 @@ Status BlockFetcher::ReadBlockContents() {
if (do_uncompress_ && compression_type != kNoCompression) { if (do_uncompress_ && compression_type != kNoCompression) {
// compressed page, uncompress, update cache // compressed page, uncompress, update cache
UncompressionContext uncompression_ctx(compression_type, compression_dict_); UncompressionContext uncompression_ctx(compression_type, compression_dict_);
status_ = UncompressBlockContents(uncompression_ctx, status_ =
slice_.data(), block_size_, contents_, UncompressBlockContents(uncompression_ctx, slice_.data(), block_size_,
footer_.version(), contents_, footer_.version(), ioptions_);
ioptions_);
} else { } else {
GetBlockContents(); GetBlockContents();
} }

@ -264,12 +264,14 @@ Status ReadFooterFromFile(RandomAccessFileReader* file,
return Status::OK(); return Status::OK();
} }
Status UncompressBlockContentsForCompressionType(const UncompressionContext& uncompression_ctx, Status UncompressBlockContentsForCompressionType(
const char* data, size_t n, BlockContents* contents, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
uint32_t format_version, const ImmutableCFOptions &ioptions) { BlockContents* contents, uint32_t format_version,
const ImmutableCFOptions& ioptions) {
std::unique_ptr<char[]> ubuf; std::unique_ptr<char[]> ubuf;
assert(uncompression_ctx.type() != kNoCompression && "Invalid compression type"); assert(uncompression_ctx.type() != kNoCompression &&
"Invalid compression type");
StopWatchNano timer(ioptions.env, StopWatchNano timer(ioptions.env,
ShouldReportDetailedTime(ioptions.env, ioptions.statistics)); ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
@ -290,8 +292,8 @@ Status UncompressBlockContentsForCompressionType(const UncompressionContext& unc
break; break;
} }
case kZlibCompression: case kZlibCompression:
ubuf.reset(Zlib_Uncompress(uncompression_ctx, ubuf.reset(Zlib_Uncompress(
data, n, &decompress_size, uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kZlibCompression, format_version))); GetCompressFormatForVersion(kZlibCompression, format_version)));
if (!ubuf) { if (!ubuf) {
static char zlib_corrupt_msg[] = static char zlib_corrupt_msg[] =
@ -314,8 +316,8 @@ Status UncompressBlockContentsForCompressionType(const UncompressionContext& unc
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4Compression: case kLZ4Compression:
ubuf.reset(LZ4_Uncompress(uncompression_ctx, ubuf.reset(LZ4_Uncompress(
data, n, &decompress_size, uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4Compression, format_version))); GetCompressFormatForVersion(kLZ4Compression, format_version)));
if (!ubuf) { if (!ubuf) {
static char lz4_corrupt_msg[] = static char lz4_corrupt_msg[] =
@ -326,8 +328,8 @@ Status UncompressBlockContentsForCompressionType(const UncompressionContext& unc
BlockContents(std::move(ubuf), decompress_size, true, kNoCompression); BlockContents(std::move(ubuf), decompress_size, true, kNoCompression);
break; break;
case kLZ4HCCompression: case kLZ4HCCompression:
ubuf.reset(LZ4_Uncompress(uncompression_ctx, ubuf.reset(LZ4_Uncompress(
data, n, &decompress_size, uncompression_ctx, data, n, &decompress_size,
GetCompressFormatForVersion(kLZ4HCCompression, format_version))); GetCompressFormatForVersion(kLZ4HCCompression, format_version)));
if (!ubuf) { if (!ubuf) {
static char lz4hc_corrupt_msg[] = static char lz4hc_corrupt_msg[] =
@ -386,8 +388,7 @@ Status UncompressBlockContents(const UncompressionContext& uncompression_ctx,
assert(data[n] != kNoCompression); assert(data[n] != kNoCompression);
assert(data[n] == uncompression_ctx.type()); assert(data[n] == uncompression_ctx.type());
return UncompressBlockContentsForCompressionType( return UncompressBlockContentsForCompressionType(
uncompression_ctx, data, n, contents, uncompression_ctx, data, n, contents, format_version, ioptions);
format_version, ioptions);
} }
} // namespace rocksdb } // namespace rocksdb

@ -228,19 +228,17 @@ extern Status ReadBlockContents(
// free this buffer. // free this buffer.
// For description of compress_format_version and possible values, see // For description of compress_format_version and possible values, see
// util/compression.h // util/compression.h
extern Status UncompressBlockContents(const UncompressionContext& uncompression_ctx, extern Status UncompressBlockContents(
const char* data, size_t n, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
BlockContents* contents, BlockContents* contents, uint32_t compress_format_version,
uint32_t compress_format_version,
const ImmutableCFOptions& ioptions); const ImmutableCFOptions& ioptions);
// This is an extension to UncompressBlockContents that accepts // This is an extension to UncompressBlockContents that accepts
// a specific compression type. This is used by un-wrapped blocks // a specific compression type. This is used by un-wrapped blocks
// with no compression header. // with no compression header.
extern Status UncompressBlockContentsForCompressionType( extern Status UncompressBlockContentsForCompressionType(
const UncompressionContext& uncompression_ctx, const UncompressionContext& uncompression_ctx, const char* data, size_t n,
const char* data, size_t n, BlockContents* contents, BlockContents* contents, uint32_t compress_format_version,
uint32_t compress_format_version,
const ImmutableCFOptions& ioptions); const ImmutableCFOptions& ioptions);
// Implementation details follow. Clients should ignore, // Implementation details follow. Clients should ignore,

@ -1951,32 +1951,32 @@ class Benchmark {
bool ok = true; bool ok = true;
switch (FLAGS_compression_type_e) { switch (FLAGS_compression_type_e) {
case rocksdb::kSnappyCompression: case rocksdb::kSnappyCompression:
ok = Snappy_Compress(compression_ctx, input.data(), ok = Snappy_Compress(compression_ctx, input.data(), input.size(),
input.size(), compressed); compressed);
break; break;
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
ok = Zlib_Compress(compression_ctx, 2, input.data(), ok = Zlib_Compress(compression_ctx, 2, input.data(), input.size(),
input.size(), compressed); compressed);
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
ok = BZip2_Compress(compression_ctx, 2, input.data(), ok = BZip2_Compress(compression_ctx, 2, input.data(), input.size(),
input.size(), compressed); compressed);
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
ok = LZ4_Compress(compression_ctx, 2, input.data(), ok = LZ4_Compress(compression_ctx, 2, input.data(), input.size(),
input.size(), compressed); compressed);
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
ok = LZ4HC_Compress(compression_ctx, 2, input.data(), ok = LZ4HC_Compress(compression_ctx, 2, input.data(), input.size(),
input.size(), compressed); compressed);
break; break;
case rocksdb::kXpressCompression: case rocksdb::kXpressCompression:
ok = XPRESS_Compress(input.data(), ok = XPRESS_Compress(input.data(),
input.size(), compressed); input.size(), compressed);
break; break;
case rocksdb::kZSTD: case rocksdb::kZSTD:
ok = ZSTD_Compress(compression_ctx, input.data(), ok = ZSTD_Compress(compression_ctx, input.data(), input.size(),
input.size(), compressed); compressed);
break; break;
default: default:
ok = false; ok = false;
@ -2058,8 +2058,10 @@ class Benchmark {
const int len = FLAGS_block_size; const int len = FLAGS_block_size;
std::string input_str(len, 'y'); std::string input_str(len, 'y');
std::string compressed; std::string compressed;
CompressionContext compression_ctx(FLAGS_compression_type_e, Options().compression_opts); CompressionContext compression_ctx(FLAGS_compression_type_e,
bool result = CompressSlice(compression_ctx, Slice(input_str), &compressed); Options().compression_opts);
bool result =
CompressSlice(compression_ctx, Slice(input_str), &compressed);
if (!result) { if (!result) {
fprintf(stdout, "WARNING: %s compression is not enabled\n", fprintf(stdout, "WARNING: %s compression is not enabled\n",
@ -2904,8 +2906,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
} }
case rocksdb::kZlibCompression: case rocksdb::kZlibCompression:
uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(), uncompressed = Zlib_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), compressed.size(), &decompress_size, 2);
&decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kBZip2Compression: case rocksdb::kBZip2Compression:
@ -2915,14 +2916,12 @@ void VerifyDBFromDB(std::string& truth_db_name) {
break; break;
case rocksdb::kLZ4Compression: case rocksdb::kLZ4Compression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), compressed.size(), &decompress_size, 2);
&decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kLZ4HCCompression: case rocksdb::kLZ4HCCompression:
uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(), uncompressed = LZ4_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), compressed.size(), &decompress_size, 2);
&decompress_size, 2);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
case rocksdb::kXpressCompression: case rocksdb::kXpressCompression:
@ -2932,8 +2931,7 @@ void VerifyDBFromDB(std::string& truth_db_name) {
break; break;
case rocksdb::kZSTD: case rocksdb::kZSTD:
uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(), uncompressed = ZSTD_Uncompress(uncompression_ctx, compressed.data(),
compressed.size(), compressed.size(), &decompress_size);
&decompress_size);
ok = uncompressed != nullptr; ok = uncompressed != nullptr;
break; break;
default: default:

@ -219,7 +219,8 @@ DEFINE_int32(level0_stop_writes_trigger,
rocksdb::Options().level0_stop_writes_trigger, rocksdb::Options().level0_stop_writes_trigger,
"Number of files in level-0 that will trigger put stop."); "Number of files in level-0 that will trigger put stop.");
DEFINE_int32(block_size, rocksdb::BlockBasedTableOptions().block_size, DEFINE_int32(block_size,
static_cast<int32_t>(rocksdb::BlockBasedTableOptions().block_size),
"Number of bytes in a block."); "Number of bytes in a block.");
DEFINE_int32( DEFINE_int32(

@ -62,26 +62,24 @@ public:
// Init from cache // Init from cache
ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete; ZSTDUncompressCachedData(const ZSTDUncompressCachedData& o) = delete;
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT : ZSTDUncompressCachedData(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT
ZSTDUncompressCachedData() { : ZSTDUncompressCachedData() {
*this = std::move(o); *this = std::move(o);
} }
ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o) ROCKSDB_NOEXCEPT { ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&& o)
ROCKSDB_NOEXCEPT {
assert(zstd_ctx_ == nullptr); assert(zstd_ctx_ == nullptr);
std::swap(zstd_ctx_, o.zstd_ctx_); std::swap(zstd_ctx_, o.zstd_ctx_);
std::swap(cache_idx_, o.cache_idx_); std::swap(cache_idx_, o.cache_idx_);
return *this; return *this;
} }
ZSTDNativeContext Get() const { ZSTDNativeContext Get() const { return zstd_ctx_; }
return zstd_ctx_; int64_t GetCacheIndex() const { return cache_idx_; }
}
int64_t GetCacheIndex() const {
return cache_idx_;
}
void CreateIfNeeded() { void CreateIfNeeded() {
if (zstd_ctx_ == nullptr) { if (zstd_ctx_ == nullptr) {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides()); zstd_ctx_ =
ZSTD_createDCtx_advanced(port::GetJeZstdAllocationOverrides());
#else // ROCKSDB_ZSTD_CUSTOM_MEM #else // ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createDCtx(); zstd_ctx_ = ZSTD_createDCtx();
#endif // ROCKSDB_ZSTD_CUSTOM_MEM #endif // ROCKSDB_ZSTD_CUSTOM_MEM
@ -97,6 +95,7 @@ public:
ZSTD_freeDCtx(zstd_ctx_); ZSTD_freeDCtx(zstd_ctx_);
} }
} }
private: private:
ZSTDNativeContext zstd_ctx_ = nullptr; ZSTDNativeContext zstd_ctx_ = nullptr;
int64_t cache_idx_ = -1; // -1 means this instance owns the context int64_t cache_idx_ = -1; // -1 means this instance owns the context
@ -114,14 +113,12 @@ public:
ZSTDUncompressCachedData() {} ZSTDUncompressCachedData() {}
ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {} ZSTDUncompressCachedData(const ZSTDUncompressCachedData&) {}
ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete; ZSTDUncompressCachedData& operator=(const ZSTDUncompressCachedData&) = delete;
ZSTDUncompressCachedData(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default; ZSTDUncompressCachedData(ZSTDUncompressCachedData&&)
ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&) ROCKSDB_NOEXCEPT = default; ROCKSDB_NOEXCEPT = default;
ZSTDNativeContext Get() const { ZSTDUncompressCachedData& operator=(ZSTDUncompressCachedData&&)
return nullptr; ROCKSDB_NOEXCEPT = default;
} ZSTDNativeContext Get() const { return nullptr; }
int64_t GetCacheIndex() const { int64_t GetCacheIndex() const { return -1; }
return -1;
}
void CreateIfNeeded() {} void CreateIfNeeded() {}
void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {} void InitFromCache(const ZSTDUncompressCachedData&, int64_t) {}
}; };
@ -145,7 +142,8 @@ private:
void CreateNativeContext() { void CreateNativeContext() {
if (type_ == kZSTD) { if (type_ == kZSTD) {
#ifdef ROCKSDB_ZSTD_CUSTOM_MEM #ifdef ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides()); zstd_ctx_ =
ZSTD_createCCtx_advanced(port::GetJeZstdAllocationOverrides());
#else // ROCKSDB_ZSTD_CUSTOM_MEM #else // ROCKSDB_ZSTD_CUSTOM_MEM
zstd_ctx_ = ZSTD_createCCtx(); zstd_ctx_ = ZSTD_createCCtx();
#endif // ROCKSDB_ZSTD_CUSTOM_MEM #endif // ROCKSDB_ZSTD_CUSTOM_MEM
@ -156,6 +154,7 @@ private:
ZSTD_freeCCtx(zstd_ctx_); ZSTD_freeCCtx(zstd_ctx_);
} }
} }
public: public:
// callable inside ZSTD_Compress // callable inside ZSTD_Compress
ZSTD_CCtx* ZSTDPreallocCtx() const { ZSTD_CCtx* ZSTDPreallocCtx() const {
@ -168,36 +167,22 @@ private:
void DestroyNativeContext() {} void DestroyNativeContext() {}
#endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500) #endif // ZSTD && (ZSTD_VERSION_NUMBER >= 500)
public: public:
explicit CompressionContext(CompressionType comp_type) : explicit CompressionContext(CompressionType comp_type) : type_(comp_type) {
type_(comp_type) {
CreateNativeContext(); CreateNativeContext();
} }
CompressionContext(CompressionType comp_type, CompressionContext(CompressionType comp_type, const CompressionOptions& opts,
const CompressionOptions& opts, const Slice& comp_dict = Slice())
const Slice& comp_dict = Slice()) : : type_(comp_type), opts_(opts), dict_(comp_dict) {
type_(comp_type),
opts_(opts),
dict_(comp_dict) {
CreateNativeContext(); CreateNativeContext();
} }
~CompressionContext() { ~CompressionContext() { DestroyNativeContext(); }
DestroyNativeContext();
}
CompressionContext(const CompressionContext&) = delete; CompressionContext(const CompressionContext&) = delete;
CompressionContext& operator=(const CompressionContext&) = delete; CompressionContext& operator=(const CompressionContext&) = delete;
const CompressionOptions& options() const { const CompressionOptions& options() const { return opts_; }
return opts_; CompressionType type() const { return type_; }
} const Slice& dict() const { return dict_; }
CompressionType type() const { Slice& dict() { return dict_; }
return type_;
}
const Slice& dict() const {
return dict_;
}
Slice& dict() {
return dict_;
}
}; };
// Instantiate this class and pass it to the uncompression API below // Instantiate this class and pass it to the uncompression API below
@ -207,25 +192,22 @@ private:
Slice dict_; Slice dict_;
CompressionContextCache* ctx_cache_ = nullptr; CompressionContextCache* ctx_cache_ = nullptr;
ZSTDUncompressCachedData uncomp_cached_data_; ZSTDUncompressCachedData uncomp_cached_data_;
public: public:
struct NoCache {}; struct NoCache {};
// Do not use context cache, used by TableBuilder // Do not use context cache, used by TableBuilder
UncompressionContext(NoCache, CompressionType comp_type) : UncompressionContext(NoCache, CompressionType comp_type) : type_(comp_type) {}
type_(comp_type) { explicit UncompressionContext(CompressionType comp_type)
} : UncompressionContext(comp_type, Slice()) {}
explicit UncompressionContext(CompressionType comp_type) : UncompressionContext(CompressionType comp_type, const Slice& comp_dict)
UncompressionContext(comp_type, Slice()) { : type_(comp_type), dict_(comp_dict) {
}
UncompressionContext(CompressionType comp_type, const Slice& comp_dict) :
type_(comp_type), dict_(comp_dict) {
if (type_ == kZSTD) { if (type_ == kZSTD) {
ctx_cache_ = CompressionContextCache::Instance(); ctx_cache_ = CompressionContextCache::Instance();
uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData(); uncomp_cached_data_ = ctx_cache_->GetCachedZSTDUncompressData();
} }
} }
~UncompressionContext() { ~UncompressionContext() {
if (type_ == kZSTD && if (type_ == kZSTD && uncomp_cached_data_.GetCacheIndex() != -1) {
uncomp_cached_data_.GetCacheIndex() != -1) {
assert(ctx_cache_ != nullptr); assert(ctx_cache_ != nullptr);
ctx_cache_->ReturnCachedZSTDUncompressData( ctx_cache_->ReturnCachedZSTDUncompressData(
uncomp_cached_data_.GetCacheIndex()); uncomp_cached_data_.GetCacheIndex());
@ -237,15 +219,9 @@ public:
ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const { ZSTDUncompressCachedData::ZSTDNativeContext GetZSTDContext() const {
return uncomp_cached_data_.Get(); return uncomp_cached_data_.Get();
} }
CompressionType type() const { CompressionType type() const { return type_; }
return type_; const Slice& dict() const { return dict_; }
} Slice& dict() { return dict_; }
const Slice& dict() const {
return dict_;
}
Slice& dict() {
return dict_;
}
}; };
inline bool Snappy_Supported() { inline bool Snappy_Supported() {
@ -471,8 +447,8 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
if (ctx.dict().size()) { if (ctx.dict().size()) {
// Initialize the compression library's dictionary // Initialize the compression library's dictionary
st = deflateSetDictionary( st = deflateSetDictionary(&_stream,
&_stream, reinterpret_cast<const Bytef*>(ctx.dict().data()), reinterpret_cast<const Bytef*>(ctx.dict().data()),
static_cast<unsigned int>(ctx.dict().size())); static_cast<unsigned int>(ctx.dict().size()));
if (st != Z_OK) { if (st != Z_OK) {
deflateEnd(&_stream); deflateEnd(&_stream);
@ -516,8 +492,8 @@ inline bool Zlib_Compress(const CompressionContext& ctx,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* Zlib_Uncompress(const UncompressionContext& ctx, const char* input_data, inline char* Zlib_Uncompress(const UncompressionContext& ctx,
size_t input_length, const char* input_data, size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version, uint32_t compress_format_version,
int windowBits = -14) { int windowBits = -14) {
@ -551,8 +527,8 @@ inline char* Zlib_Uncompress(const UncompressionContext& ctx, const char* input_
if (ctx.dict().size()) { if (ctx.dict().size()) {
// Initialize the compression library's dictionary // Initialize the compression library's dictionary
st = inflateSetDictionary( st = inflateSetDictionary(&_stream,
&_stream, reinterpret_cast<const Bytef*>(ctx.dict().data()), reinterpret_cast<const Bytef*>(ctx.dict().data()),
static_cast<unsigned int>(ctx.dict().size())); static_cast<unsigned int>(ctx.dict().size()));
if (st != Z_OK) { if (st != Z_OK) {
return nullptr; return nullptr;
@ -835,8 +811,8 @@ inline bool LZ4_Compress(const CompressionContext& ctx,
// header in varint32 format // header in varint32 format
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* LZ4_Uncompress(const UncompressionContext& ctx, const char* input_data, inline char* LZ4_Uncompress(const UncompressionContext& ctx,
size_t input_length, const char* input_data, size_t input_length,
int* decompress_size, int* decompress_size,
uint32_t compress_format_version) { uint32_t compress_format_version) {
#ifdef LZ4 #ifdef LZ4
@ -1026,8 +1002,8 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
#if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+ #if ZSTD_VERSION_NUMBER >= 500 // v0.5.0+
ZSTD_CCtx* context = ctx.ZSTDPreallocCtx(); ZSTD_CCtx* context = ctx.ZSTDPreallocCtx();
assert(context != nullptr); assert(context != nullptr);
outlen = ZSTD_compress_usingDict( outlen = ZSTD_compress_usingDict(context, &(*output)[output_header_len],
context, &(*output)[output_header_len], compressBound, input, length, compressBound, input, length,
ctx.dict().data(), ctx.dict().size(), level); ctx.dict().data(), ctx.dict().size(), level);
#else // up to v0.4.x #else // up to v0.4.x
outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input, outlen = ZSTD_compress(&(*output)[output_header_len], compressBound, input,
@ -1049,8 +1025,8 @@ inline bool ZSTD_Compress(const CompressionContext& ctx, const char* input,
// @param compression_dict Data for presetting the compression library's // @param compression_dict Data for presetting the compression library's
// dictionary. // dictionary.
inline char* ZSTD_Uncompress(const UncompressionContext& ctx, const char* input_data, inline char* ZSTD_Uncompress(const UncompressionContext& ctx,
size_t input_length, const char* input_data, size_t input_length,
int* decompress_size) { int* decompress_size) {
#ifdef ZSTD #ifdef ZSTD
uint32_t output_len = 0; uint32_t output_len = 0;
@ -1065,8 +1041,8 @@ inline char* ZSTD_Uncompress(const UncompressionContext& ctx, const char* input_
ZSTD_DCtx* context = ctx.GetZSTDContext(); ZSTD_DCtx* context = ctx.GetZSTDContext();
assert(context != nullptr); assert(context != nullptr);
actual_output_length = ZSTD_decompress_usingDict( actual_output_length = ZSTD_decompress_usingDict(
context, output, output_len, input_data, input_length, context, output, output_len, input_data, input_length, ctx.dict().data(),
ctx.dict().data(), ctx.dict().size()); ctx.dict().size());
#else // up to v0.4.x #else // up to v0.4.x
actual_output_length = actual_output_length =
ZSTD_decompress(output, output_len, input_data, input_length); ZSTD_decompress(output, output_len, input_data, input_length);

@ -29,9 +29,9 @@ struct ZSTDCachedData {
ZSTDUncompressCachedData uncomp_cached_data_; ZSTDUncompressCachedData uncomp_cached_data_;
std::atomic<void*> zstd_uncomp_sentinel_; std::atomic<void*> zstd_uncomp_sentinel_;
char padding[(CACHE_LINE_SIZE - char
(sizeof(ZSTDUncompressCachedData) + padding[(CACHE_LINE_SIZE -
sizeof(std::atomic<void*>)) % (sizeof(ZSTDUncompressCachedData) + sizeof(std::atomic<void*>)) %
CACHE_LINE_SIZE)]; // unused padding field CACHE_LINE_SIZE)]; // unused padding field
ZSTDCachedData() : zstd_uncomp_sentinel_(&uncomp_cached_data_) {} ZSTDCachedData() : zstd_uncomp_sentinel_(&uncomp_cached_data_) {}
@ -41,7 +41,8 @@ struct ZSTDCachedData {
ZSTDUncompressCachedData GetUncompressData(int64_t idx) { ZSTDUncompressCachedData GetUncompressData(int64_t idx) {
ZSTDUncompressCachedData result; ZSTDUncompressCachedData result;
void* expected = &uncomp_cached_data_; void* expected = &uncomp_cached_data_;
if (zstd_uncomp_sentinel_.compare_exchange_strong(expected, SentinelValue)) { if (zstd_uncomp_sentinel_.compare_exchange_strong(expected,
SentinelValue)) {
uncomp_cached_data_.CreateIfNeeded(); uncomp_cached_data_.CreateIfNeeded();
result.InitFromCache(uncomp_cached_data_, idx); result.InitFromCache(uncomp_cached_data_, idx);
} else { } else {
@ -60,15 +61,15 @@ struct ZSTDCachedData {
} }
} }
}; };
static_assert(sizeof(ZSTDCachedData) % CACHE_LINE_SIZE == 0, "Expected CACHE_LINE_SIZE alignment"); static_assert(sizeof(ZSTDCachedData) % CACHE_LINE_SIZE == 0,
} // compression_cache "Expected CACHE_LINE_SIZE alignment");
} // namespace compression_cache
using namespace compression_cache; using namespace compression_cache;
class CompressionContextCache::Rep { class CompressionContextCache::Rep {
public: public:
Rep() { Rep() {}
}
ZSTDUncompressCachedData GetZSTDUncompressData() { ZSTDUncompressCachedData GetZSTDUncompressData() {
auto p = per_core_uncompr_.AccessElementAndIndex(); auto p = per_core_uncompr_.AccessElementAndIndex();
int64_t idx = static_cast<int64_t>(p.second); int64_t idx = static_cast<int64_t>(p.second);
@ -79,24 +80,22 @@ public:
auto* cn = per_core_uncompr_.AccessAtCore(static_cast<size_t>(idx)); auto* cn = per_core_uncompr_.AccessAtCore(static_cast<size_t>(idx));
cn->ReturnUncompressData(); cn->ReturnUncompressData();
} }
private: private:
CoreLocalArray<ZSTDCachedData> per_core_uncompr_; CoreLocalArray<ZSTDCachedData> per_core_uncompr_;
}; };
CompressionContextCache::CompressionContextCache() : CompressionContextCache::CompressionContextCache() : rep_(new Rep()) {}
rep_(new Rep()) {
}
CompressionContextCache* CompressionContextCache::Instance() { CompressionContextCache* CompressionContextCache::Instance() {
static CompressionContextCache instance; static CompressionContextCache instance;
return &instance; return &instance;
} }
void CompressionContextCache::InitSingleton() { void CompressionContextCache::InitSingleton() { Instance(); }
Instance();
}
ZSTDUncompressCachedData CompressionContextCache::GetCachedZSTDUncompressData() { ZSTDUncompressCachedData
CompressionContextCache::GetCachedZSTDUncompressData() {
return rep_->GetZSTDUncompressData(); return rep_->GetZSTDUncompressData();
} }
@ -104,8 +103,6 @@ void CompressionContextCache::ReturnCachedZSTDUncompressData(int64_t idx) {
rep_->ReturnZSTDUncompressData(idx); rep_->ReturnZSTDUncompressData(idx);
} }
CompressionContextCache::~CompressionContextCache() { CompressionContextCache::~CompressionContextCache() { delete rep_; }
delete rep_;
}
} } // namespace rocksdb

@ -11,9 +11,9 @@
// This helps with Random Read latencies and reduces CPU utilization // This helps with Random Read latencies and reduces CPU utilization
// Caching is implemented using CoreLocal facility. Compression/Uncompression // Caching is implemented using CoreLocal facility. Compression/Uncompression
// instances are cached on a per core basis using CoreLocalArray. A borrowed // instances are cached on a per core basis using CoreLocalArray. A borrowed
// instance is atomically replaced with a sentinel value for the time of being used. // instance is atomically replaced with a sentinel value for the time of being
// If it turns out that another thread is already makes use of the instance we still // used. If it turns out that another thread is already makes use of the
// create one on the heap which is later is destroyed. // instance we still create one on the heap which is later is destroyed.
#pragma once #pragma once
@ -42,4 +42,4 @@ private:
Rep* rep_; Rep* rep_;
}; };
} } // namespace rocksdb

@ -1122,10 +1122,8 @@ Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
BLOB_DB_DECOMPRESSION_MICROS); BLOB_DB_DECOMPRESSION_MICROS);
UncompressionContext uncompression_ctx(bfile->compression()); UncompressionContext uncompression_ctx(bfile->compression());
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
uncompression_ctx, uncompression_ctx, blob_value.data(), blob_value.size(), &contents,
blob_value.data(), blob_value.size(), &contents, kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
kBlockBasedTableVersionFormat,
*(cfh->cfd()->ioptions()));
} }
value->PinSelf(contents.data); value->PinSelf(contents.data);
} }

@ -207,10 +207,8 @@ Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
BlockContents contents; BlockContents contents;
UncompressionContext uncompression_ctx(compression); UncompressionContext uncompression_ctx(compression);
s = UncompressBlockContentsForCompressionType( s = UncompressBlockContentsForCompressionType(
uncompression_ctx, uncompression_ctx, slice.data() + key_size, value_size, &contents,
slice.data() + key_size, value_size, &contents, 2 /*compress_format_version*/, ImmutableCFOptions(Options()));
2 /*compress_format_version*/,
ImmutableCFOptions(Options()));
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }

@ -173,8 +173,7 @@ void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) { if (type != kNoCompression) {
UncompressionContext uncompression_ctx(type); UncompressionContext uncompression_ctx(type);
UncompressBlockContents(uncompression_ctx, UncompressBlockContents(uncompression_ctx, slice_final_with_bit.c_str(),
slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents, slice_final_with_bit.size() - 1, &contents,
format_version, ioptions); format_version, ioptions);
decoded_content = std::string(contents.data.data(), contents.data.size()); decoded_content = std::string(contents.data.data(), contents.data.size());
@ -247,9 +246,8 @@ void CompressDataBlock(const std::string& output_content, Slice* slice_final,
CompressionType* type, std::string* compressed_output) { CompressionType* type, std::string* compressed_output) {
CompressionContext compression_ctx(*type); CompressionContext compression_ctx(*type);
uint32_t format_version = 2; // hard-coded version uint32_t format_version = 2; // hard-coded version
*slice_final = *slice_final = CompressBlock(output_content, compression_ctx, type,
CompressBlock(output_content, compression_ctx, type, format_version, format_version, compressed_output);
compressed_output);
} }
} // namespace } // namespace

Loading…
Cancel
Save