From 39b0d92153755c59a833099f49ab89dc6f99f6a8 Mon Sep 17 00:00:00 2001 From: Siddhartha Roychowdhury Date: Thu, 17 Feb 2022 16:18:01 -0800 Subject: [PATCH] Add record to set WAL compression type if enabled (#9556) Summary: When WAL compression is enabled, add a record (new record type) to store the compression type to indicate that all subsequent records are compressed. The log reader will store the compression type when this record is encountered and use the type to uncompress the subsequent records. Compress and uncompress to be implemented in subsequent diffs. Enabled WAL compression in some WAL tests to check for regressions. Some tests that rely on offsets have been disabled. Pull Request resolved: https://github.com/facebook/rocksdb/pull/9556 Reviewed By: anand1976 Differential Revision: D34308216 Pulled By: sidroyc fbshipit-source-id: 7f10595e46f3277f1ea2d309fbf95e2e935a8705 --- db/db_impl/db_impl_open.cc | 7 +++-- db/db_wal_test.cc | 35 +++++++++++++++++------ db/log_format.h | 5 +++- db/log_reader.cc | 57 ++++++++++++++++++++++++++++++++++++- db/log_reader.h | 11 +++++++- db/log_test.cc | 48 +++++++++++++++++++++++-------- db/log_writer.cc | 33 ++++++++++++++++++++-- db/log_writer.h | 8 +++++- include/rocksdb/options.h | 3 +- tools/db_bench_tool.cc | 2 +- util/compression.h | 58 ++++++++++++++++++++++++++++++++++++++ 11 files changed, 236 insertions(+), 31 deletions(-) diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index f15e6b62e..ee79eb9f0 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -191,8 +191,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, #endif // !ROCKSDB_LITE // Supported wal compression types - if (result.wal_compression != kNoCompression && - result.wal_compression != kZSTD) { + if (!StreamingCompressionTypeSupported(result.wal_compression)) { result.wal_compression = kNoCompression; ROCKS_LOG_WARN(result.info_log, "wal_compression is disabled since only zstd is supported"); @@ -1586,7 +1585,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, tmp_set.Contains(FileType::kWalFile))); *new_log = new log::Writer(std::move(file_writer), log_file_num, immutable_db_options_.recycle_log_file_num > 0, - immutable_db_options_.manual_wal_flush); + immutable_db_options_.manual_wal_flush, + immutable_db_options_.wal_compression); + io_s = (*new_log)->AddCompressionTypeRecord(); } return io_s; } diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index c46db3362..8a61dadaf 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1280,9 +1280,12 @@ class RecoveryTestHelper { ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(), fname, file_options, &file_writer, nullptr)); - current_log_writer.reset( + log::Writer* log_writer = new log::Writer(std::move(file_writer), current_log_number, - db_options.recycle_log_file_num > 0)); + db_options.recycle_log_file_num > 0, false, + db_options.wal_compression); + ASSERT_OK(log_writer->AddCompressionTypeRecord()); + current_log_writer.reset(log_writer); WriteBatch batch; for (int i = 0; i < kKeysPerWALFile; i++) { @@ -1351,9 +1354,9 @@ class RecoveryTestHelper { } }; -class DBWALTestWithParams - : public DBWALTestBase, - public ::testing::WithParamInterface> { +class DBWALTestWithParams : public DBWALTestBase, + public ::testing::WithParamInterface< + std::tuple> { public: DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {} }; @@ -1364,12 +1367,14 @@ INSTANTIATE_TEST_CASE_P( ::testing::Range(RecoveryTestHelper::kWALFileOffset, RecoveryTestHelper::kWALFileOffset + RecoveryTestHelper::kWALFilesCount, - 1))); + 1), + ::testing::Values(CompressionType::kNoCompression, + CompressionType::kZSTD))); class DBWALTestWithParamsVaryingRecoveryMode : public DBWALTestBase, public ::testing::WithParamInterface< - std::tuple> { + std::tuple> { public: DBWALTestWithParamsVaryingRecoveryMode() : DBWALTestBase("/db_wal_test_with_params_mode") {} @@ -1386,7 +1391,9 @@ INSTANTIATE_TEST_CASE_P( ::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords, WALRecoveryMode::kAbsoluteConsistency, WALRecoveryMode::kPointInTimeRecovery, - WALRecoveryMode::kSkipAnyCorruptedRecords))); + WALRecoveryMode::kSkipAnyCorruptedRecords), + ::testing::Values(CompressionType::kNoCompression, + CompressionType::kZSTD))); // Test scope: // - We expect to open the data store when there is incomplete trailing writes @@ -1432,6 +1439,9 @@ TEST_P(DBWALTestWithParams, kAbsoluteConsistency) { // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file + // WAL compression type + CompressionType compression_type = std::get<3>(GetParam()); + options.wal_compression = compression_type; if (trunc && corrupt_offset == 0) { return; @@ -1492,9 +1502,12 @@ TEST_P(DBWALTestWithParams, kPointInTimeRecovery) { // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file + // WAL compression type + CompressionType compression_type = std::get<3>(GetParam()); // Fill data for testing Options options = CurrentOptions(); + options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); // Corrupt the wal @@ -1543,9 +1556,12 @@ TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) { // Corruption offset position int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file + // WAL compression type + CompressionType compression_type = std::get<3>(GetParam()); // Fill data for testing Options options = CurrentOptions(); + options.wal_compression = compression_type; const size_t row_count = RecoveryTestHelper::FillData(this, &options); // Corrupt the WAL @@ -1769,8 +1785,11 @@ TEST_P(DBWALTestWithParamsVaryingRecoveryMode, int corrupt_offset = std::get<1>(GetParam()); int wal_file_id = std::get<2>(GetParam()); // WAL file WALRecoveryMode recovery_mode = std::get<3>(GetParam()); + // WAL compression type + CompressionType compression_type = std::get<4>(GetParam()); options.wal_recovery_mode = recovery_mode; + options.wal_compression = compression_type; // Create corrupted WAL RecoveryTestHelper::FillData(this, &options); RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3, diff --git a/db/log_format.h b/db/log_format.h index c22e2b6bc..d397372f4 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -32,8 +32,11 @@ enum RecordType { kRecyclableFirstType = 6, kRecyclableMiddleType = 7, kRecyclableLastType = 8, + + // Compression Type + kSetCompressionType = 9, }; -static const int kMaxRecordType = kRecyclableLastType; +static const int kMaxRecordType = kSetCompressionType; static const unsigned int kBlockSize = 32768; diff --git a/db/log_reader.cc b/db/log_reader.cc index 84082e8ea..d88eb08cb 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -38,7 +38,10 @@ Reader::Reader(std::shared_ptr info_log, last_record_offset_(0), end_of_buffer_offset_(0), log_number_(log_num), - recycled_(false) {} + recycled_(false), + first_record_read_(false), + compression_type_(kNoCompression), + compression_type_record_read_(false) {} Reader::~Reader() { delete[] backing_store_; @@ -79,6 +82,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, scratch->clear(); *record = fragment; last_record_offset_ = prospective_record_offset; + first_record_read_ = true; return true; case kFirstType: @@ -114,6 +118,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; + first_record_read_ = true; return true; } break; @@ -212,6 +217,30 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } break; + case kSetCompressionType: { + if (compression_type_record_read_) { + ReportCorruption(fragment.size(), + "read multiple SetCompressionType records"); + } + if (first_record_read_) { + ReportCorruption(fragment.size(), + "SetCompressionType not the first record"); + } + prospective_record_offset = physical_record_offset; + scratch->clear(); + last_record_offset_ = prospective_record_offset; + CompressionTypeRecord compression_record(kNoCompression); + Status s = compression_record.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode SetCompressionType record"); + } else { + compression_type_ = compression_record.GetCompressionType(); + compression_type_record_read_ = true; + } + break; + } + default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", record_type); @@ -449,6 +478,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, *record = fragment; prospective_record_offset = physical_record_offset; last_record_offset_ = prospective_record_offset; + first_record_read_ = true; in_fragmented_record_ = false; return true; @@ -483,6 +513,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, fragments_.clear(); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; + first_record_read_ = true; in_fragmented_record_ = false; return true; } @@ -512,6 +543,30 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, } break; + case kSetCompressionType: { + if (compression_type_record_read_) { + ReportCorruption(fragment.size(), + "read multiple SetCompressionType records"); + } + if (first_record_read_) { + ReportCorruption(fragment.size(), + "SetCompressionType not the first record"); + } + fragments_.clear(); + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + CompressionTypeRecord compression_record(kNoCompression); + Status s = compression_record.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode SetCompressionType record"); + } else { + compression_type_ = compression_record.GetCompressionType(); + } + break; + } + default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", diff --git a/db/log_reader.h b/db/log_reader.h index 1ddc4abdd..cd2d795c8 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -8,14 +8,16 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once -#include #include +#include + #include "db/log_format.h" #include "file/sequence_file_reader.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" +#include "util/compression.h" namespace ROCKSDB_NAMESPACE { class Logger; @@ -128,6 +130,13 @@ class Reader { // Whether this is a recycled log file bool recycled_; + // Whether the first record has been read or not. + bool first_record_read_; + // Type of compression used + CompressionType compression_type_; + // Track whether the compression type record has been read or not. + bool compression_type_record_read_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, diff --git a/db/log_test.cc b/db/log_test.cc index 2e993d8f9..3723347c8 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -47,7 +47,8 @@ static std::string RandomSkewedString(int i, Random* rnd) { // Param type is tuple // get<0>(tuple): non-zero if recycling log, zero if regular log // get<1>(tuple): true if allow retry after read EOF, false otherwise -class LogTest : public ::testing::TestWithParam> { +class LogTest + : public ::testing::TestWithParam> { private: class StringSource : public FSSequentialFile { public: @@ -143,23 +144,27 @@ class LogTest : public ::testing::TestWithParam> { test::StringSink* sink_; StringSource* source_; ReportCollector report_; - std::unique_ptr writer_; - std::unique_ptr reader_; protected: + std::unique_ptr writer_; + std::unique_ptr reader_; bool allow_retry_read_; + CompressionType compression_type_; public: LogTest() : reader_contents_(), sink_(new test::StringSink(&reader_contents_)), source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))), - allow_retry_read_(std::get<1>(GetParam())) { + allow_retry_read_(std::get<1>(GetParam())), + compression_type_(std::get<2>(GetParam())) { std::unique_ptr sink_holder(sink_); std::unique_ptr file_writer(new WritableFileWriter( std::move(sink_holder), "" /* don't care */, FileOptions())); - writer_.reset( - new Writer(std::move(file_writer), 123, std::get<0>(GetParam()))); + Writer* writer = + new Writer(std::move(file_writer), 123, std::get<0>(GetParam()), false, + compression_type_); + writer_.reset(writer); std::unique_ptr source_holder(source_); std::unique_ptr file_reader( new SequentialFileReader(std::move(source_holder), "" /* file name */)); @@ -676,11 +681,11 @@ TEST_P(LogTest, Recycle) { ASSERT_EQ("EOF", Read()); } -INSTANTIATE_TEST_CASE_P(bool, LogTest, - ::testing::Values(std::make_tuple(0, false), - std::make_tuple(0, true), - std::make_tuple(1, false), - std::make_tuple(1, true))); +// Do NOT enable compression for this instantiation. +INSTANTIATE_TEST_CASE_P( + Log, LogTest, + ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), + ::testing::Values(CompressionType::kNoCompression))); class RetriableLogTest : public ::testing::TestWithParam { private: @@ -892,6 +897,27 @@ TEST_P(RetriableLogTest, NonBlockingReadFullRecord) { INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2)); +class CompressionLogTest : public LogTest { + public: + Status SetupTestEnv() { return writer_->AddCompressionTypeRecord(); } +}; + +TEST_P(CompressionLogTest, Empty) { + ASSERT_OK(SetupTestEnv()); + const bool compression_enabled = + std::get<2>(GetParam()) == kNoCompression ? false : true; + // If WAL compression is enabled, a record is added for the compression type + const int compression_record_size = compression_enabled ? kHeaderSize + 4 : 0; + ASSERT_EQ(compression_record_size, WrittenBytes()); + ASSERT_EQ("EOF", Read()); +} + +INSTANTIATE_TEST_CASE_P( + Compression, CompressionLogTest, + ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), + ::testing::Values(CompressionType::kNoCompression, + CompressionType::kZSTD))); + } // namespace log } // namespace ROCKSDB_NAMESPACE diff --git a/db/log_writer.cc b/db/log_writer.cc index e2e596596..410c634cc 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -19,12 +19,14 @@ namespace ROCKSDB_NAMESPACE { namespace log { Writer::Writer(std::unique_ptr&& dest, uint64_t log_number, - bool recycle_log_files, bool manual_flush) + bool recycle_log_files, bool manual_flush, + CompressionType compression_type) : dest_(std::move(dest)), block_offset_(0), log_number_(log_number), recycle_log_files_(recycle_log_files), - manual_flush_(manual_flush) { + manual_flush_(manual_flush), + compression_type_(compression_type) { for (int i = 0; i <= kMaxRecordType; i++) { char t = static_cast(i); type_crc_[i] = crc32c::Value(&t, 1); @@ -112,6 +114,31 @@ IOStatus Writer::AddRecord(const Slice& slice) { return s; } +IOStatus Writer::AddCompressionTypeRecord() { + // Should be the first record + assert(block_offset_ == 0); + + if (compression_type_ == kNoCompression) { + // No need to add a record + return IOStatus::OK(); + } + + CompressionTypeRecord record(compression_type_); + std::string encode; + record.EncodeTo(&encode); + IOStatus s = + EmitPhysicalRecord(kSetCompressionType, encode.data(), encode.size()); + if (s.ok()) { + if (!manual_flush_) { + s = dest_->Flush(); + } + } else { + // Disable compression if the record could not be added. + compression_type_ = kNoCompression; + } + return s; +} + bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { @@ -126,7 +153,7 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { buf[6] = static_cast(t); uint32_t crc = type_crc_[t]; - if (t < kRecyclableFullType) { + if (t < kRecyclableFullType || t == kSetCompressionType) { // Legacy record format assert(block_offset_ + kHeaderSize + n <= kBlockSize); header_size = kHeaderSize; diff --git a/db/log_writer.h b/db/log_writer.h index 1a91b2199..8f60049db 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -12,6 +12,7 @@ #include #include "db/log_format.h" +#include "rocksdb/compression_type.h" #include "rocksdb/io_status.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -72,7 +73,8 @@ class Writer { // "*dest" must remain live while this Writer is in use. explicit Writer(std::unique_ptr&& dest, uint64_t log_number, bool recycle_log_files, - bool manual_flush = false); + bool manual_flush = false, + CompressionType compressionType = kNoCompression); // No copying allowed Writer(const Writer&) = delete; void operator=(const Writer&) = delete; @@ -80,6 +82,7 @@ class Writer { ~Writer(); IOStatus AddRecord(const Slice& slice); + IOStatus AddCompressionTypeRecord(); WritableFileWriter* file() { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); } @@ -108,6 +111,9 @@ class Writer { // If true, it does not flush after each write. Instead it relies on the upper // layer to manually does the flush by calling ::WriteBuffer() bool manual_flush_; + + // Compression Type + CompressionType compression_type_; }; } // namespace log diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index cbbf4cfd0..bc47ba410 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1182,7 +1182,8 @@ struct DBOptions { // This feature is WORK IN PROGRESS // If enabled WAL records will be compressed before they are written. - // Only zstd is supported. + // Only zstd is supported. Compressed WAL records will be read in supported + // versions regardless of the wal_compression settings. CompressionType wal_compression = kNoCompression; // If true, RocksDB supports flushing multiple column families and committing diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index b4df6a432..4d5cd5b05 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -763,7 +763,7 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write."); DEFINE_bool(manual_wal_flush, false, "If true, buffer WAL until buffer is full or a manual FlushWAL()."); -DEFINE_string(wal_compression, "string", +DEFINE_string(wal_compression, "none", "Algorithm to use for WAL compression. none to disable."); static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_wal_compression_e = ROCKSDB_NAMESPACE::kNoCompression; diff --git a/util/compression.h b/util/compression.h index a58b2b575..8f21807e6 100644 --- a/util/compression.h +++ b/util/compression.h @@ -514,6 +514,26 @@ inline bool ZSTDNotFinal_Supported() { #endif } +inline bool ZSTD_Streaming_Supported() { +#ifdef ZSTD + return ZSTD_versionNumber() >= 10300; +#else + return false; +#endif +} + +inline bool StreamingCompressionTypeSupported( + CompressionType compression_type) { + switch (compression_type) { + case kNoCompression: + return true; + case kZSTD: + return ZSTD_Streaming_Supported(); + default: + return false; + } +} + inline bool CompressionTypeSupported(CompressionType compression_type) { switch (compression_type) { case kNoCompression: @@ -1535,4 +1555,42 @@ inline CacheAllocationPtr UncompressData( } } +// Records the compression type for subsequent WAL records. +class CompressionTypeRecord { + public: + explicit CompressionTypeRecord(CompressionType compression_type) + : compression_type_(compression_type) {} + + CompressionType GetCompressionType() const { return compression_type_; } + + inline void EncodeTo(std::string* dst) const { + assert(dst != nullptr); + PutFixed32(dst, compression_type_); + } + + inline Status DecodeFrom(Slice* src) { + constexpr char class_name[] = "CompressionTypeRecord"; + + uint32_t val; + if (!GetFixed32(src, &val)) { + return Status::Corruption(class_name, + "Error decoding WAL compression type"); + } + CompressionType compression_type = static_cast(val); + if (!StreamingCompressionTypeSupported(compression_type)) { + return Status::Corruption(class_name, + "WAL compression type not supported"); + } + compression_type_ = compression_type; + return Status::OK(); + } + + inline std::string DebugString() const { + return "compression_type: " + CompressionTypeToString(compression_type_); + } + + private: + CompressionType compression_type_; +}; + } // namespace ROCKSDB_NAMESPACE