Fix bug in WAL streaming uncompression (#11198)

Summary:
Fix a bug in the calculation of the input buffer address/offset in log_reader.cc. The bug is when consecutive fragments of a compressed record are located at the same offset in the log reader buffer, the second fragment input buffer is treated as a leftover from the previous input buffer. As a result, the offset in the `ZSTD_inBuffer` is not reset.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11198

Test Plan: Add a unit test in log_test.cc that fails without the fix and passes with it.

Reviewed By: ajkr, cbi42

Differential Revision: D43102692

Pulled By: anand1976

fbshipit-source-id: aa2648f4802c33991b76a3233c5a58d4cc9e77fd
oxigraph-8.1.1
anand76 2 years ago committed by Facebook GitHub Bot
parent 876d281592
commit 77b61abc7b
  1. 1
      HISTORY.md
  2. 14
      db/log_reader.cc
  3. 37
      db/log_test.cc
  4. 4
      util/compression.cc
  5. 7
      util/compression.h

@ -12,6 +12,7 @@
* Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish * Fixed `DisableManualCompaction()` and `CompactRangeOptions::canceled` to cancel compactions even when they are waiting on conflicting compactions to finish
* Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()` * Fixed a bug in which a successful `GetMergeOperands()` could transiently return `Status::MergeInProgress()`
* Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support. * Return the correct error (Status::NotSupported()) to MultiGet caller when ReadOptions::async_io flag is true and IO uring is not enabled. Previously, Status::Corruption() was being returned when the actual failure was lack of async IO support.
* Fixed a bug in DB open/recovery from a compressed WAL that was caused due to incorrect handling of certain record fragments with the same offset within a WAL block.
### Feature Removal ### Feature Removal
* Remove RocksDB Lite. * Remove RocksDB Lite.

@ -515,10 +515,11 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
size_t uncompressed_size = 0; size_t uncompressed_size = 0;
int remaining = 0; int remaining = 0;
const char* input = header + header_size;
do { do {
remaining = uncompress_->Uncompress(header + header_size, length, remaining = uncompress_->Uncompress(
uncompressed_buffer_.get(), input, length, uncompressed_buffer_.get(), &uncompressed_size);
&uncompressed_size); input = nullptr;
if (remaining < 0) { if (remaining < 0) {
buffer_.clear(); buffer_.clear();
return kBadRecord; return kBadRecord;
@ -830,10 +831,11 @@ bool FragmentBufferedReader::TryReadFragment(
uncompressed_record_.clear(); uncompressed_record_.clear();
size_t uncompressed_size = 0; size_t uncompressed_size = 0;
int remaining = 0; int remaining = 0;
const char* input = header + header_size;
do { do {
remaining = uncompress_->Uncompress(header + header_size, length, remaining = uncompress_->Uncompress(
uncompressed_buffer_.get(), input, length, uncompressed_buffer_.get(), &uncompressed_size);
&uncompressed_size); input = nullptr;
if (remaining < 0) { if (remaining < 0) {
buffer_.clear(); buffer_.clear();
*fragment_type_or_err = kBadRecord; *fragment_type_or_err = kBadRecord;

@ -979,6 +979,38 @@ TEST_P(CompressionLogTest, Fragmentation) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_P(CompressionLogTest, AlignedFragmentation) {
CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
Random rnd(301);
int num_filler_records = 0;
// Keep writing small records until the next record will be aligned at the
// beginning of the block.
while ((WrittenBytes() & (kBlockSize - 1)) >= kHeaderSize) {
char entry = 'a';
ASSERT_OK(writer_->AddRecord(Slice(&entry, 1)));
num_filler_records++;
}
const std::vector<std::string> wal_entries = {
rnd.RandomBinaryString(3 * kBlockSize),
};
for (const std::string& wal_entry : wal_entries) {
Write(wal_entry);
}
for (int i = 0; i < num_filler_records; ++i) {
ASSERT_EQ("a", Read());
}
for (const std::string& wal_entry : wal_entries) {
ASSERT_EQ(wal_entry, Read());
}
ASSERT_EQ("EOF", Read());
}
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
Compression, CompressionLogTest, Compression, CompressionLogTest,
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(), ::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
@ -1026,10 +1058,11 @@ TEST_P(StreamingCompressionTest, Basic) {
for (int i = 0; i < (int)compressed_buffers.size(); i++) { for (int i = 0; i < (int)compressed_buffers.size(); i++) {
// Call uncompress till either the entire input is consumed or the output // Call uncompress till either the entire input is consumed or the output
// buffer size is equal to the allocated output buffer size. // buffer size is equal to the allocated output buffer size.
const char* input = compressed_buffers[i].c_str();
do { do {
ret_val = uncompress->Uncompress(compressed_buffers[i].c_str(), ret_val = uncompress->Uncompress(input, compressed_buffers[i].size(),
compressed_buffers[i].size(),
uncompressed_output_buffer, &output_pos); uncompressed_output_buffer, &output_pos);
input = nullptr;
if (output_pos > 0) { if (output_pos > 0) {
std::string uncompressed_fragment; std::string uncompressed_fragment;
uncompressed_fragment.assign(uncompressed_output_buffer, output_pos); uncompressed_fragment.assign(uncompressed_output_buffer, output_pos);

@ -85,14 +85,14 @@ void ZSTDStreamingCompress::Reset() {
int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size, int ZSTDStreamingUncompress::Uncompress(const char* input, size_t input_size,
char* output, size_t* output_pos) { char* output, size_t* output_pos) {
assert(input != nullptr && output != nullptr && output_pos != nullptr); assert(output != nullptr && output_pos != nullptr);
*output_pos = 0; *output_pos = 0;
// Don't need to uncompress an empty input // Don't need to uncompress an empty input
if (input_size == 0) { if (input_size == 0) {
return 0; return 0;
} }
#ifdef ZSTD_STREAMING #ifdef ZSTD_STREAMING
if (input_buffer_.src != input) { if (input) {
// New input // New input
input_buffer_ = {input, input_size, /*pos=*/0}; input_buffer_ = {input, input_size, /*pos=*/0};
} }

@ -1711,8 +1711,11 @@ class StreamingUncompress {
compress_format_version_(compress_format_version), compress_format_version_(compress_format_version),
max_output_len_(max_output_len) {} max_output_len_(max_output_len) {}
virtual ~StreamingUncompress() = default; virtual ~StreamingUncompress() = default;
// uncompress should be called again with the same input if output_size is // Uncompress can be called repeatedly to progressively process the same
// equal to max_output_len or with the next input fragment. // input buffer, or can be called with a new input buffer. When the input
// buffer is not fully consumed, the return value is > 0 or output_size
// == max_output_len. When calling uncompress to continue processing the
// same input buffer, the input argument should be nullptr.
// Parameters: // Parameters:
// input - buffer to uncompress // input - buffer to uncompress
// input_size - size of input buffer // input_size - size of input buffer

Loading…
Cancel
Save