From fec4403ff1f8427f54b3020ccda0b412aa106d52 Mon Sep 17 00:00:00 2001 From: Siddhartha Roychowdhury Date: Wed, 9 Mar 2022 15:49:53 -0800 Subject: [PATCH] Integrate WAL compression into log reader/writer. (#9642) Summary: Integrate the streaming compress/uncompress API into WAL compression. The streaming compression object is stored in the log_writer along with a reusable output buffer to store the compressed buffer(s). The streaming uncompress object is stored in the log_reader along with a reusable output buffer to store the uncompressed buffer(s). Pull Request resolved: https://github.com/facebook/rocksdb/pull/9642 Test Plan: Added unit tests to verify different scenarios - large buffers, split compressed buffers, etc. Future optimizations: The overhead for small records is quite high, so it makes sense to compress only buffers above a certain threshold and use a separate record type to indicate that those records are compressed. Reviewed By: anand1976 Differential Revision: D34709167 Pulled By: sidroyc fbshipit-source-id: a37a3cd1301adff6152fb3fcd23726106af07dd4 --- db/log_reader.cc | 86 ++++++++++++++++++++++++++++++++++++++++----- db/log_reader.h | 7 ++++ db/log_test.cc | 78 ++++++++++++++++++++++++++++++++++------ db/log_writer.cc | 53 ++++++++++++++++++++++++++-- db/log_writer.h | 4 +++ util/compression.cc | 30 +++++++++------- util/compression.h | 6 ++-- 7 files changed, 226 insertions(+), 38 deletions(-) diff --git a/db/log_reader.cc b/db/log_reader.cc index d88eb08cb..fbf50d7c1 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -10,6 +10,7 @@ #include "db/log_reader.h" #include + #include "file/sequence_file_reader.h" #include "port/lang.h" #include "rocksdb/env.h" @@ -41,10 +42,14 @@ Reader::Reader(std::shared_ptr info_log, recycled_(false), first_record_read_(false), compression_type_(kNoCompression), - compression_type_record_read_(false) {} + compression_type_record_read_(false), + uncompress_(nullptr) {} Reader::~Reader() { delete[] backing_store_; + if (uncompress_) { + delete uncompress_; + } } // For kAbsoluteConsistency, on clean shutdown we don't expect any error @@ -58,6 +63,9 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, WALRecoveryMode wal_recovery_mode) { scratch->clear(); record->clear(); + if (uncompress_) { + uncompress_->Reset(); + } bool in_fragmented_record = false; // Record offset of the logical record that we're reading // 0 is a dummy value to make compilers happy @@ -235,8 +243,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, ReportCorruption(fragment.size(), "could not decode SetCompressionType record"); } else { - compression_type_ = compression_record.GetCompressionType(); - compression_type_record_read_ = true; + InitCompression(compression_record); } break; } @@ -450,17 +457,54 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size) { buffer_.remove_prefix(header_size + length); - *result = Slice(header + header_size, length); - return type; + if (!uncompress_ || type == kSetCompressionType) { + *result = Slice(header + header_size, length); + return type; + } else { + // Uncompress compressed records + uncompressed_record_.clear(); + size_t uncompressed_size = 0; + int remaining = 0; + do { + remaining = uncompress_->Uncompress(header + header_size, length, + uncompressed_buffer_.get(), + &uncompressed_size); + if (remaining < 0) { + buffer_.clear(); + return kBadRecord; + } + if (uncompressed_size > 0) { + uncompressed_record_.append(uncompressed_buffer_.get(), + uncompressed_size); + } + } while (remaining > 0 || uncompressed_size == kBlockSize); + *result = Slice(uncompressed_record_); + return type; + } } } +// Initialize uncompress related fields +void Reader::InitCompression(const CompressionTypeRecord& compression_record) { + compression_type_ = compression_record.GetCompressionType(); + compression_type_record_read_ = true; + constexpr uint32_t compression_format_version = 2; + uncompress_ = StreamingUncompress::Create( + compression_type_, compression_format_version, kBlockSize); + assert(uncompress_ != nullptr); + uncompressed_buffer_ = std::unique_ptr(new char[kBlockSize]); + assert(uncompressed_buffer_); +} + bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, WALRecoveryMode /*unused*/) { assert(record != nullptr); assert(scratch != nullptr); record->clear(); scratch->clear(); + if (uncompress_) { + uncompress_->Reset(); + } uint64_t prospective_record_offset = 0; uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); @@ -562,7 +606,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, ReportCorruption(fragment.size(), "could not decode SetCompressionType record"); } else { - compression_type_ = compression_record.GetCompressionType(); + InitCompression(compression_record); } break; } @@ -700,9 +744,33 @@ bool FragmentBufferedReader::TryReadFragment( buffer_.remove_prefix(header_size + length); - *fragment = Slice(header + header_size, length); - *fragment_type_or_err = type; - return true; + if (!uncompress_ || type == kSetCompressionType) { + *fragment = Slice(header + header_size, length); + *fragment_type_or_err = type; + return true; + } else { + // Uncompress compressed records + uncompressed_record_.clear(); + size_t uncompressed_size = 0; + int remaining = 0; + do { + remaining = uncompress_->Uncompress(header + header_size, length, + uncompressed_buffer_.get(), + &uncompressed_size); + if (remaining < 0) { + buffer_.clear(); + *fragment_type_or_err = kBadRecord; + return true; + } + if (uncompressed_size > 0) { + uncompressed_record_.append(uncompressed_buffer_.get(), + uncompressed_size); + } + } while (remaining > 0 || uncompressed_size == kBlockSize); + *fragment = Slice(std::move(uncompressed_record_)); + *fragment_type_or_err = type; + return true; + } } } // namespace log diff --git a/db/log_reader.h b/db/log_reader.h index cd2d795c8..6f5b98621 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -136,6 +136,11 @@ class Reader { CompressionType compression_type_; // Track whether the compression type record has been read or not. bool compression_type_record_read_; + StreamingUncompress* uncompress_; + // Reusable uncompressed output buffer + std::unique_ptr uncompressed_buffer_; + // Reusable uncompressed record + std::string uncompressed_record_; // Extend record types with the following special values enum { @@ -167,6 +172,8 @@ class Reader { // buffer_ must be updated to remove the dropped bytes prior to invocation. void ReportCorruption(size_t bytes, const char* reason); void ReportDrop(size_t bytes, const Status& reason); + + void InitCompression(const CompressionTypeRecord& compression_record); }; class FragmentBufferedReader : public Reader { diff --git a/db/log_test.cc b/db/log_test.cc index 59de62d13..c7bc45656 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -904,6 +904,11 @@ class CompressionLogTest : public LogTest { }; TEST_P(CompressionLogTest, Empty) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } ASSERT_OK(SetupTestEnv()); const bool compression_enabled = std::get<2>(GetParam()) == kNoCompression ? false : true; @@ -913,6 +918,57 @@ TEST_P(CompressionLogTest, Empty) { ASSERT_EQ("EOF", Read()); } +TEST_P(CompressionLogTest, ReadWrite) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + ASSERT_OK(SetupTestEnv()); + Write("foo"); + Write("bar"); + Write(""); + Write("xxxx"); + ASSERT_EQ("foo", Read()); + ASSERT_EQ("bar", Read()); + ASSERT_EQ("", Read()); + ASSERT_EQ("xxxx", Read()); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("EOF", Read()); // Make sure reads at eof work +} + +TEST_P(CompressionLogTest, ManyBlocks) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + ASSERT_OK(SetupTestEnv()); + for (int i = 0; i < 100000; i++) { + Write(NumberString(i)); + } + for (int i = 0; i < 100000; i++) { + ASSERT_EQ(NumberString(i), Read()); + } + ASSERT_EQ("EOF", Read()); +} + +TEST_P(CompressionLogTest, Fragmentation) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + ASSERT_OK(SetupTestEnv()); + Write("small"); + Write(BigString("medium", 50000)); + Write(BigString("large", 100000)); + ASSERT_EQ("small", Read()); + ASSERT_EQ(BigString("medium", 50000), Read()); + ASSERT_EQ(BigString("large", 100000), Read()); + ASSERT_EQ("EOF", Read()); +} + INSTANTIATE_TEST_CASE_P( Compression, CompressionLogTest, ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), @@ -942,34 +998,34 @@ TEST_P(StreamingCompressionTest, Basic) { // Call compress till the entire input is consumed do { char* output_buffer = (char*)allocator->Allocate(kBlockSize); - size_t output_size; + size_t output_pos; remaining = compress->Compress(input_buffer.c_str(), input_size, - output_buffer, &output_size); - if (output_size > 0) { + output_buffer, &output_pos); + if (output_pos > 0) { std::string compressed_buffer; - compressed_buffer.assign(output_buffer, output_size); + compressed_buffer.assign(output_buffer, output_pos); compressed_buffers.emplace_back(std::move(compressed_buffer)); } allocator->Deallocate((void*)output_buffer); } while (remaining > 0); std::string uncompressed_buffer = ""; int ret_val = 0; - size_t output_size; + size_t output_pos; char* uncompressed_output_buffer = (char*)allocator->Allocate(kBlockSize); // Uncompress the fragments and concatenate them. for (int i = 0; i < (int)compressed_buffers.size(); i++) { // Call uncompress till either the entire input is consumed or the output // buffer size is equal to the allocated output buffer size. do { - ret_val = uncompress->Uncompress( - compressed_buffers[i].c_str(), compressed_buffers[i].size(), - uncompressed_output_buffer, &output_size); - if (output_size > 0) { + ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(), + compressed_buffers[i].size(), + uncompressed_output_buffer, &output_pos); + if (output_pos > 0) { std::string uncompressed_fragment; - uncompressed_fragment.assign(uncompressed_output_buffer, output_size); + uncompressed_fragment.assign(uncompressed_output_buffer, output_pos); uncompressed_buffer += uncompressed_fragment; } - } while (ret_val > 0 || output_size == kBlockSize); + } while (ret_val > 0 || output_pos == kBlockSize); } allocator->Deallocate((void*)uncompressed_output_buffer); delete allocator; diff --git a/db/log_writer.cc b/db/log_writer.cc index 77b82950a..807131cfc 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -10,6 +10,7 @@ #include "db/log_writer.h" #include + #include "file/writable_file_writer.h" #include "rocksdb/env.h" #include "util/coding.h" @@ -26,7 +27,8 @@ Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, log_number_(log_number), recycle_log_files_(recycle_log_files), manual_flush_(manual_flush), - compression_type_(compression_type) { + compression_type_(compression_type), + compress_(nullptr) { for (int i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); @@ -37,6 +39,9 @@ Writer::~Writer() { if (dest_) { WriteBuffer().PermitUncheckedError(); } + if (compress_) { + delete compress_; + } } IOStatus Writer::WriteBuffer() { return dest_->Flush(); } @@ -64,6 +69,12 @@ IOStatus Writer::AddRecord(const Slice& slice, // zero-length record IOStatus s; bool begin = true; + int compress_remaining = 0; + bool compress_start = false; + if (compress_) { + compress_->Reset(); + compress_start = true; + } do { const int64_t leftover = kBlockSize - block_offset_; assert(leftover >= 0); @@ -87,10 +98,34 @@ IOStatus Writer::AddRecord(const Slice& slice, assert(static_cast(kBlockSize - block_offset_) >= header_size); const size_t avail = kBlockSize - block_offset_ - header_size; + + // Compress the record if compression is enabled. + // Compress() is called at least once (compress_start=true) and after the + // previous generated compressed chunk is written out as one or more + // physical records (left=0). + if (compress_ && (compress_start || left == 0)) { + compress_remaining = compress_->Compress(slice.data(), slice.size(), + compressed_buffer_.get(), &left); + + if (compress_remaining < 0) { + // Set failure status + s = IOStatus::IOError("Unexpected WAL compression error"); + s.SetDataLoss(true); + break; + } else if (left == 0) { + // Nothing left to compress + if (!compress_start) { + break; + } + } + compress_start = false; + ptr = compressed_buffer_.get(); + } + const size_t fragment_length = (left < avail) ? left : avail; RecordType type; - const bool end = (left == fragment_length); + const bool end = (left == fragment_length && compress_remaining == 0); if (begin && end) { type = recycle_log_files_ ? kRecyclableFullType : kFullType; } else if (begin) { @@ -105,7 +140,7 @@ IOStatus Writer::AddRecord(const Slice& slice, ptr += fragment_length; left -= fragment_length; begin = false; - } while (s.ok() && left > 0); + } while (s.ok() && (left > 0 || compress_remaining > 0)); if (s.ok()) { if (!manual_flush_) { @@ -134,6 +169,18 @@ IOStatus Writer::AddCompressionTypeRecord() { if (!manual_flush_) { s = dest_->Flush(); } + // Initialize fields required for compression + const size_t max_output_buffer_len = + kBlockSize - (recycle_log_files_ ? kRecyclableHeaderSize : kHeaderSize); + CompressionOptions opts; + constexpr uint32_t compression_format_version = 2; + compress_ = StreamingCompress::Create(compression_type_, opts, + compression_format_version, + max_output_buffer_len); + assert(compress_ != nullptr); + compressed_buffer_ = + std::unique_ptr(new char[max_output_buffer_len]); + assert(compressed_buffer_); } else { // Disable compression if the record could not be added. compression_type_ = kNoCompression; diff --git a/db/log_writer.h b/db/log_writer.h index ec0e4788e..6f7e9d103 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -17,6 +17,7 @@ #include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "util/compression.h" namespace ROCKSDB_NAMESPACE { @@ -118,6 +119,9 @@ class Writer { // Compression Type CompressionType compression_type_; + StreamingCompress* compress_; + // Reusable compressed output buffer + std::unique_ptr compressed_buffer_; }; } // namespace log diff --git a/util/compression.cc b/util/compression.cc index aacb72d22..8e2f01b12 100644 --- a/util/compression.cc +++ b/util/compression.cc @@ -41,10 +41,13 @@ StreamingUncompress* StreamingUncompress::Create( } int ZSTDStreamingCompress::Compress(const char* input, size_t input_size, - char* output, size_t* output_size) { - assert(input != nullptr && output != nullptr && input_size > 0 && - output_size != nullptr); - *output_size = 0; + char* output, size_t* output_pos) { + assert(input != nullptr && output != nullptr && output_pos != nullptr); + *output_pos = 0; + // Don't need to compress an empty input + if (input_size == 0) { + return 0; + } #ifndef ZSTD_STREAMING (void)input; (void)input_size; @@ -61,15 +64,15 @@ int ZSTDStreamingCompress::Compress(const char* input, size_t input_size, } ZSTD_outBuffer output_buffer = {output, max_output_len_, /*pos=*/0}; const size_t remaining = - ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_flush); + ZSTD_compressStream2(cctx_, &output_buffer, &input_buffer_, ZSTD_e_end); if (ZSTD_isError(remaining)) { // Failure Reset(); return -1; } // Success - *output_size = output_buffer.pos; - return (int)(input_buffer_.size - input_buffer_.pos); + *output_pos = output_buffer.pos; + return (int)remaining; #endif } @@ -81,10 +84,13 @@ void ZSTDStreamingCompress::Reset() { } int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, - char* output, size_t* output_size) { - assert(input != nullptr && output != nullptr && input_size > 0 && - output_size != nullptr); - *output_size = 0; + char* output, size_t* output_pos) { + assert(input != nullptr && output != nullptr && output_pos != nullptr); + *output_pos = 0; + // Don't need to uncompress an empty input + if (input_size == 0) { + return 0; + } #ifdef ZSTD_STREAMING if (input_buffer_.src != input) { // New input @@ -96,7 +102,7 @@ int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, Reset(); return -1; } - *output_size = output_buffer.pos; + *output_pos = output_buffer.pos; return (int)(input_buffer_.size - input_buffer_.pos); #else (void)input; diff --git a/util/compression.h b/util/compression.h index 274f1487e..3e10aa6ba 100644 --- a/util/compression.h +++ b/util/compression.h @@ -1622,7 +1622,7 @@ class StreamingCompress { // Returns -1 for errors, the remaining size of the input buffer that needs to // be compressed virtual int Compress(const char* input, size_t input_size, char* output, - size_t* output_size) = 0; + size_t* output_pos) = 0; // static method to create object of a class inherited from StreamingCompress // based on the actual compression type. static StreamingCompress* Create(CompressionType compression_type, @@ -1662,7 +1662,7 @@ class StreamingUncompress { // output_size - size of the output buffer // Returns -1 for errors, remaining input to be processed otherwise. virtual int Uncompress(const char* input, size_t input_size, char* output, - size_t* output_size) = 0; + size_t* output_pos) = 0; static StreamingUncompress* Create(CompressionType compression_type, uint32_t compress_format_version, size_t max_output_len); @@ -1693,7 +1693,7 @@ class ZSTDStreamingCompress final : public StreamingCompress { #endif } int Compress(const char* input, size_t input_size, char* output, - size_t* output_size) override; + size_t* output_pos) override; void Reset() override; #ifdef ZSTD_STREAMING ZSTD_CCtx* cctx_;