Streaming Compression API for WAL compression. (#9619)

Summary:
Implement a streaming compression API (compress/uncompress) to use for WAL compression. The log_writer would use the compress class/API to compress a record before writing it out in chunks. The log_reader would use the uncompress class/API to uncompress the chunks and combine into a single record.

Added unit test to verify the API for different sizes/compression types.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9619

Test Plan: make -j24 check

Reviewed By: anand1976

Differential Revision: D34437346

Pulled By: sidroyc

fbshipit-source-id: b180569ad2ddcf3106380f8758b556cc0ad18382
main
Siddhartha Roychowdhury 3 years ago committed by Facebook GitHub Bot
parent f706a9c199
commit 21345d2823
  1. 1
      CMakeLists.txt
  2. 2
      TARGETS
  3. 68
      db/log_test.cc
  4. 1
      src.mk
  5. 116
      util/compression.cc
  6. 134
      util/compression.h

@ -830,6 +830,7 @@ set(SOURCES
util/coding.cc util/coding.cc
util/compaction_job_stats_impl.cc util/compaction_job_stats_impl.cc
util/comparator.cc util/comparator.cc
util/compression.cc
util/compression_context_cache.cc util/compression_context_cache.cc
util/concurrent_task_limiter_impl.cc util/concurrent_task_limiter_impl.cc
util/crc32c.cc util/crc32c.cc

@ -227,6 +227,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"util/coding.cc", "util/coding.cc",
"util/compaction_job_stats_impl.cc", "util/compaction_job_stats_impl.cc",
"util/comparator.cc", "util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc", "util/compression_context_cache.cc",
"util/concurrent_task_limiter_impl.cc", "util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc", "util/crc32c.cc",
@ -544,6 +545,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"util/coding.cc", "util/coding.cc",
"util/compaction_job_stats_impl.cc", "util/compaction_job_stats_impl.cc",
"util/comparator.cc", "util/comparator.cc",
"util/compression.cc",
"util/compression_context_cache.cc", "util/compression_context_cache.cc",
"util/concurrent_task_limiter_impl.cc", "util/concurrent_task_limiter_impl.cc",
"util/crc32c.cc", "util/crc32c.cc",

@ -17,6 +17,7 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/random.h" #include "util/random.h"
#include "utilities/memory_allocators.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace log { namespace log {
@ -918,6 +919,73 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(CompressionType::kNoCompression, ::testing::Values(CompressionType::kNoCompression,
CompressionType::kZSTD))); CompressionType::kZSTD)));
class StreamingCompressionTest
: public ::testing::TestWithParam<std::tuple<int, CompressionType>> {};
TEST_P(StreamingCompressionTest, Basic) {
size_t input_size = std::get<0>(GetParam());
CompressionType compression_type = std::get<1>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
CompressionOptions opts;
constexpr uint32_t compression_format_version = 2;
StreamingCompress* compress = StreamingCompress::Create(
compression_type, opts, compression_format_version, kBlockSize);
StreamingUncompress* uncompress = StreamingUncompress::Create(
compression_type, compression_format_version, kBlockSize);
MemoryAllocator* allocator = new DefaultMemoryAllocator();
std::string input_buffer = BigString("abc", input_size);
std::vector<std::string> compressed_buffers;
size_t remaining;
// Call compress till the entire input is consumed
do {
char* output_buffer = (char*)allocator->Allocate(kBlockSize);
size_t output_size;
remaining = compress->Compress(input_buffer.c_str(), input_size,
output_buffer, &output_size);
if (output_size > 0) {
std::string compressed_buffer;
compressed_buffer.assign(output_buffer, output_size);
compressed_buffers.emplace_back(std::move(compressed_buffer));
}
allocator->Deallocate((void*)output_buffer);
} while (remaining > 0);
std::string uncompressed_buffer = "";
int ret_val = 0;
size_t output_size;
char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize);
// Uncompress the fragments and concatenate them.
for (int i = 0; i < (int)compressed_buffers.size(); i++) {
// Call uncompress till either the entire input is consumed or the output
// buffer size is equal to the allocated output buffer size.
do {
ret_val = uncompress->Uncompress(
compressed_buffers[i].c_str(), compressed_buffers[i].size(),
uncompressed_output_buffer, &output_size);
if (output_size > 0) {
std::string uncompressed_fragment;
uncompressed_fragment.assign(uncompressed_output_buffer, output_size);
uncompressed_buffer += uncompressed_fragment;
}
} while (ret_val > 0 || output_size == kBlockSize);
}
allocator->Deallocate((void*)uncompressed_output_buffer);
delete allocator;
delete compress;
delete uncompress;
// The final return value from uncompress() should be 0.
ASSERT_EQ(ret_val, 0);
ASSERT_EQ(input_buffer, uncompressed_buffer);
}
INSTANTIATE_TEST_CASE_P(
StreamingCompression, StreamingCompressionTest,
::testing::Combine(::testing::Values(10, 100, 1000, kBlockSize,
kBlockSize * 2),
::testing::Values(CompressionType::kZSTD)));
} // namespace log } // namespace log
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -214,6 +214,7 @@ LIB_SOURCES = \
util/coding.cc \ util/coding.cc \
util/compaction_job_stats_impl.cc \ util/compaction_job_stats_impl.cc \
util/comparator.cc \ util/comparator.cc \
util/compression.cc \
util/compression_context_cache.cc \ util/compression_context_cache.cc \
util/concurrent_task_limiter_impl.cc \ util/concurrent_task_limiter_impl.cc \
util/crc32c.cc \ util/crc32c.cc \

@ -0,0 +1,116 @@
// Copyright (c) 2022-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
StreamingCompress* StreamingCompress::Create(CompressionType compression_type,
const CompressionOptions& opts,
uint32_t compress_format_version,
size_t max_output_len) {
switch (compression_type) {
case kZSTD: {
if (!ZSTD_Streaming_Supported()) {
return nullptr;
}
return new ZSTDStreamingCompress(opts, compress_format_version,
max_output_len);
}
default:
return nullptr;
}
}
StreamingUncompress* StreamingUncompress::Create(
CompressionType compression_type, uint32_t compress_format_version,
size_t max_output_len) {
switch (compression_type) {
case kZSTD: {
if (!ZSTD_Streaming_Supported()) {
return nullptr;
}
return new ZSTDStreamingUncompress(compress_format_version,
max_output_len);
}
default:
return nullptr;
}
}
int ZSTDStreamingCompress::Compress(const char* input, size_t input_size,
char* output, size_t* output_size) {
assert(input != nullptr && output != nullptr && input_size > 0 &&
output_size != nullptr);
*output_size = 0;
#ifndef ZSTD_STREAMING
(void)input;
(void)input_size;
(void)output;
return -1;
#else
if (input_buffer_.src == nullptr || input_buffer_.src != input) {
// New input
// Catch errors where the previous input was not fully decompressed.
assert(input_buffer_.pos == input_buffer_.size);
input_buffer_ = {input, input_size, /*pos=*/0};
} else if (input_buffer_.src == input) {
// Same input, not fully compressed.
}
ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
const size_t remaining =
ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_flush);
if (ZSTD_isError(remaining)) {
// Failure
Reset();
return -1;
}
// Success
*output_size = output_buffer.pos;
return (int)(input_buffer_.size - input_buffer_.pos);
#endif
}
void ZSTDStreamingCompress::Reset() {
#ifdef ZSTD_STREAMING
ZSTD_CCtx_reset(cctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif
}
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
char* output, size_t* output_size) {
assert(input != nullptr && output != nullptr && input_size > 0 &&
output_size != nullptr);
*output_size = 0;
#ifdef ZSTD_STREAMING
if (input_buffer_.src != input) {
// New input
input_buffer_ = {input, input_size, /*pos=*/0};
}
ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0};
size_t ret = ZSTD_decompressStream(dctx_, &output_buffer, &input_buffer_);
if (ZSTD_isError(ret)) {
Reset();
return -1;
}
*output_size = output_buffer.pos;
return (int)(input_buffer_.size - input_buffer_.pos);
#else
(void)input;
(void)input_size;
(void)output;
return -1;
#endif
}
void ZSTDStreamingUncompress::Reset() {
#ifdef ZSTD_STREAMING
ZSTD_DCtx_reset(dctx_, ZSTD_ResetDirective::ZSTD_reset_session_only);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif
}
} // namespace ROCKSDB_NAMESPACE

@ -49,6 +49,7 @@
#include <zstd.h> #include <zstd.h>
#if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+ #if ZSTD_VERSION_NUMBER >= 10103 // v1.1.3+
#include <zdict.h> #include <zdict.h>
#define ZSTD_STREAMING
#endif // ZSTD_VERSION_NUMBER >= 10103 #endif // ZSTD_VERSION_NUMBER >= 10103
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// Need this for the context allocation override // Need this for the context allocation override
@ -1593,4 +1594,137 @@ class CompressionTypeRecord {
CompressionType compression_type_; CompressionType compression_type_;
}; };
// Base class to implement compression for a stream of buffers.
// Instantiate an implementation of the class using Create() with the
// compression type and use Compress() repeatedly.
// The output buffer needs to be at least max_output_len.
// Call Reset() in between frame boundaries or in case of an error.
// NOTE: This class is not thread safe.
class StreamingCompress {
public:
StreamingCompress(CompressionType compression_type,
const CompressionOptions& opts,
uint32_t compress_format_version, size_t max_output_len)
: compression_type_(compression_type),
opts_(opts),
compress_format_version_(compress_format_version),
max_output_len_(max_output_len) {}
virtual ~StreamingCompress() = default;
// compress should be called repeatedly with the same input till the method
// returns 0
// Parameters:
// input - buffer to compress
// input_size - size of input buffer
// output - compressed buffer allocated by caller, should be at least
// max_output_len
// output_size - size of the output buffer
// Returns -1 for errors, the remaining size of the input buffer that needs to
// be compressed
virtual int Compress(const char* input, size_t input_size, char* output,
size_t* output_size) = 0;
// static method to create object of a class inherited from StreamingCompress
// based on the actual compression type.
static StreamingCompress* Create(CompressionType compression_type,
const CompressionOptions& opts,
uint32_t compress_format_version,
size_t max_output_len);
virtual void Reset() = 0;
protected:
const CompressionType compression_type_;
const CompressionOptions opts_;
const uint32_t compress_format_version_;
const size_t max_output_len_;
};
// Base class to uncompress a stream of compressed buffers.
// Instantiate an implementation of the class using Create() with the
// compression type and use Uncompress() repeatedly.
// The output buffer needs to be at least max_output_len.
// Call Reset() in between frame boundaries or in case of an error.
// NOTE: This class is not thread safe.
class StreamingUncompress {
public:
StreamingUncompress(CompressionType compression_type,
uint32_t compress_format_version, size_t max_output_len)
: compression_type_(compression_type),
compress_format_version_(compress_format_version),
max_output_len_(max_output_len) {}
virtual ~StreamingUncompress() = default;
// uncompress should be called again with the same input if output_size is
// equal to max_output_len or with the next input fragment.
// Parameters:
// input - buffer to uncompress
// input_size - size of input buffer
// output - uncompressed buffer allocated by caller, should be at least
// max_output_len
// output_size - size of the output buffer
// Returns -1 for errors, remaining input to be processed otherwise.
virtual int Uncompress(const char* input, size_t input_size, char* output,
size_t* output_size) = 0;
static StreamingUncompress* Create(CompressionType compression_type,
uint32_t compress_format_version,
size_t max_output_len);
virtual void Reset() = 0;
protected:
CompressionType compression_type_;
uint32_t compress_format_version_;
size_t max_output_len_;
};
class ZSTDStreamingCompress final : public StreamingCompress {
public:
explicit ZSTDStreamingCompress(const CompressionOptions& opts,
uint32_t compress_format_version,
size_t max_output_len)
: StreamingCompress(kZSTD, opts, compress_format_version,
max_output_len) {
#ifdef ZSTD_STREAMING
cctx_ = ZSTD_createCCtx();
assert(cctx_ != nullptr);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif
}
~ZSTDStreamingCompress() override {
#ifdef ZSTD_STREAMING
ZSTD_freeCCtx(cctx_);
#endif
}
int Compress(const char* input, size_t input_size, char* output,
size_t* output_size) override;
void Reset() override;
#ifdef ZSTD_STREAMING
ZSTD_CCtx* cctx_;
ZSTD_inBuffer input_buffer_;
#endif
};
class ZSTDStreamingUncompress final : public StreamingUncompress {
public:
explicit ZSTDStreamingUncompress(uint32_t compress_format_version,
size_t max_output_len)
: StreamingUncompress(kZSTD, compress_format_version, max_output_len) {
#ifdef ZSTD_STREAMING
dctx_ = ZSTD_createDCtx();
assert(dctx_ != nullptr);
input_buffer_ = {/*src=*/nullptr, /*size=*/0, /*pos=*/0};
#endif
}
~ZSTDStreamingUncompress() override {
#ifdef ZSTD_STREAMING
ZSTD_freeDCtx(dctx_);
#endif
}
int Uncompress(const char* input, size_t input_size, char* output,
size_t* output_size) override;
void Reset() override;
private:
#ifdef ZSTD_STREAMING
ZSTD_DCtx* dctx_;
ZSTD_inBuffer input_buffer_;
#endif
};
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save