diff --git a/HISTORY.md b/HISTORY.md index 316e5cb7a..3bf9c2282 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,6 +12,7 @@ * Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish * Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()` * Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support. +* Fixed a bug in DB open/recovery from a compressed WAL that was caused due to incorrect handling of certain record fragments with the same offset within a WAL block. ### Feature Removal * Remove RocksDB Lite. diff --git a/db/log_reader.cc b/db/log_reader.cc index a21868776..575a7d758 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -515,10 +515,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, size_t uncompressed_size = 0; int remaining = 0; + const char* input = header + header_size; do { - remaining = uncompress_->Uncompress(header + header_size, length, - uncompressed_buffer_.get(), - &uncompressed_size); + remaining = uncompress_->Uncompress( + input, length, uncompressed_buffer_.get(), &uncompressed_size); + input = nullptr; if (remaining < 0) { buffer_.clear(); return kBadRecord; @@ -830,10 +831,11 @@ bool FragmentBufferedReader::TryReadFragment( uncompressed_record_.clear(); size_t uncompressed_size = 0; int remaining = 0; + const char* input = header + header_size; do { - remaining = uncompress_->Uncompress(header + header_size, length, - uncompressed_buffer_.get(), - &uncompressed_size); + remaining = uncompress_->Uncompress( + input, length, uncompressed_buffer_.get(), &uncompressed_size); + input = nullptr; if (remaining < 0) { buffer_.clear(); *fragment_type_or_err = kBadRecord; diff --git a/db/log_test.cc b/db/log_test.cc index 2a43dc152..f4d388f41 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -979,6 +979,38 @@ TEST_P(CompressionLogTest, Fragmentation) { ASSERT_EQ("EOF", Read()); } +TEST_P(CompressionLogTest, AlignedFragmentation) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + ASSERT_OK(SetupTestEnv()); + Random rnd(301); + int num_filler_records = 0; + // Keep writing small records until the next record will be aligned at the + // beginning of the block. + while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) { + char entry = 'a'; + ASSERT_OK(writer_->AddRecord(Slice(&entry, 1))); + num_filler_records++; + } + const std::vector wal_entries = { + rnd.RandomBinaryString(3 * kBlockSize), + }; + for (const std::string& wal_entry : wal_entries) { + Write(wal_entry); + } + + for (int i = 0; i < num_filler_records; ++i) { + ASSERT_EQ("a", Read()); + } + for (const std::string& wal_entry : wal_entries) { + ASSERT_EQ(wal_entry, Read()); + } + ASSERT_EQ("EOF", Read()); +} + INSTANTIATE_TEST_CASE_P( Compression, CompressionLogTest, ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), @@ -1026,10 +1058,11 @@ TEST_P(StreamingCompressionTest, Basic) { for (int i = 0; i < (int)compressed_buffers.size(); i++) { // Call uncompress till either the entire input is consumed or the output // buffer size is equal to the allocated output buffer size. + const char* input = compressed_buffers[i].c_str(); do { - ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(), - compressed_buffers[i].size(), + ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(), uncompressed_output_buffer, &output_pos); + input = nullptr; if (output_pos > 0) { std::string uncompressed_fragment; uncompressed_fragment.assign(uncompressed_output_buffer, output_pos); diff --git a/util/compression.cc b/util/compression.cc index 8e2f01b12..712d333ee 100644 --- a/util/compression.cc +++ b/util/compression.cc @@ -85,14 +85,14 @@ void ZSTDStreamingCompress::Reset() { int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, char* output, size_t* output_pos) { - assert(input != nullptr && output != nullptr && output_pos != nullptr); + assert(output != nullptr && output_pos != nullptr); *output_pos = 0; // Don't need to uncompress an empty input if (input_size == 0) { return 0; } #ifdef ZSTD_STREAMING - if (input_buffer_.src != input) { + if (input) { // New input input_buffer_ = {input, input_size, /*pos=*/0}; } diff --git a/util/compression.h b/util/compression.h index 2185d5213..31ff5a755 100644 --- a/util/compression.h +++ b/util/compression.h @@ -1711,8 +1711,11 @@ class StreamingUncompress { compress_format_version_(compress_format_version), max_output_len_(max_output_len) {} virtual ~StreamingUncompress() = default; - // uncompress should be called again with the same input if output_size is - // equal to max_output_len or with the next input fragment. + // Uncompress can be called repeatedly to progressively process the same + // input buffer, or can be called with a new input buffer. When the input + // buffer is not fully consumed, the return value is > 0 or output_size + // == max_output_len. When calling uncompress to continue processing the + // same input buffer, the input argument should be nullptr. // Parameters: // input - buffer to uncompress // input_size - size of input buffer