Add record to set WAL compression type if enabled (#9556)

Summary:
When WAL compression is enabled, add a record (new record type) to store the compression type to indicate that all subsequent records are compressed. The log reader will store the compression type when this record is encountered and use the type to uncompress the subsequent records. Compress and uncompress to be implemented in subsequent diffs.
Enabled WAL compression in some WAL tests to check for regressions. Some tests that rely on offsets have been disabled.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9556

Reviewed By: anand1976

Differential Revision: D34308216

Pulled By: sidroyc

fbshipit-source-id: 7f10595e46f3277f1ea2d309fbf95e2e935a8705
main
Siddhartha Roychowdhury 2 years ago committed by Facebook GitHub Bot
parent f092f0fa5d
commit 39b0d92153
  1. 7
      db/db_impl/db_impl_open.cc
  2. 35
      db/db_wal_test.cc
  3. 5
      db/log_format.h
  4. 57
      db/log_reader.cc
  5. 11
      db/log_reader.h
  6. 48
      db/log_test.cc
  7. 33
      db/log_writer.cc
  8. 8
      db/log_writer.h
  9. 3
      include/rocksdb/options.h
  10. 2
      tools/db_bench_tool.cc
  11. 58
      util/compression.h

@ -191,8 +191,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
#endif // !ROCKSDB_LITE
// Supported wal compression types
if (result.wal_compression != kNoCompression &&
result.wal_compression != kZSTD) {
if (!StreamingCompressionTypeSupported(result.wal_compression)) {
result.wal_compression = kNoCompression;
ROCKS_LOG_WARN(result.info_log,
"wal_compression is disabled since only zstd is supported");
@ -1586,7 +1585,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
tmp_set.Contains(FileType::kWalFile)));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);
immutable_db_options_.manual_wal_flush,
immutable_db_options_.wal_compression);
io_s = (*new_log)->AddCompressionTypeRecord();
}
return io_s;
}

@ -1280,9 +1280,12 @@ class RecoveryTestHelper {
ASSERT_OK(WritableFileWriter::Create(db_options.env->GetFileSystem(),
fname, file_options, &file_writer,
nullptr));
current_log_writer.reset(
log::Writer* log_writer =
new log::Writer(std::move(file_writer), current_log_number,
db_options.recycle_log_file_num > 0));
db_options.recycle_log_file_num > 0, false,
db_options.wal_compression);
ASSERT_OK(log_writer->AddCompressionTypeRecord());
current_log_writer.reset(log_writer);
WriteBatch batch;
for (int i = 0; i < kKeysPerWALFile; i++) {
@ -1351,9 +1354,9 @@ class RecoveryTestHelper {
}
};
class DBWALTestWithParams
: public DBWALTestBase,
public ::testing::WithParamInterface<std::tuple<bool, int, int>> {
class DBWALTestWithParams : public DBWALTestBase,
public ::testing::WithParamInterface<
std::tuple<bool, int, int, CompressionType>> {
public:
DBWALTestWithParams() : DBWALTestBase("/db_wal_test_with_params") {}
};
@ -1364,12 +1367,14 @@ INSTANTIATE_TEST_CASE_P(
::testing::Range(RecoveryTestHelper::kWALFileOffset,
RecoveryTestHelper::kWALFileOffset +
RecoveryTestHelper::kWALFilesCount,
1)));
1),
::testing::Values(CompressionType::kNoCompression,
CompressionType::kZSTD)));
class DBWALTestWithParamsVaryingRecoveryMode
: public DBWALTestBase,
public ::testing::WithParamInterface<
std::tuple<bool, int, int, WALRecoveryMode>> {
std::tuple<bool, int, int, WALRecoveryMode, CompressionType>> {
public:
DBWALTestWithParamsVaryingRecoveryMode()
: DBWALTestBase("/db_wal_test_with_params_mode") {}
@ -1386,7 +1391,9 @@ INSTANTIATE_TEST_CASE_P(
::testing::Values(WALRecoveryMode::kTolerateCorruptedTailRecords,
WALRecoveryMode::kAbsoluteConsistency,
WALRecoveryMode::kPointInTimeRecovery,
WALRecoveryMode::kSkipAnyCorruptedRecords)));
WALRecoveryMode::kSkipAnyCorruptedRecords),
::testing::Values(CompressionType::kNoCompression,
CompressionType::kZSTD)));
// Test scope:
// - We expect to open the data store when there is incomplete trailing writes
@ -1432,6 +1439,9 @@ TEST_P(DBWALTestWithParams, kAbsoluteConsistency) {
// Corruption offset position
int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); // WAL file
// WAL compression type
CompressionType compression_type = std::get<3>(GetParam());
options.wal_compression = compression_type;
if (trunc && corrupt_offset == 0) {
return;
@ -1492,9 +1502,12 @@ TEST_P(DBWALTestWithParams, kPointInTimeRecovery) {
// Corruption offset position
int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); // WAL file
// WAL compression type
CompressionType compression_type = std::get<3>(GetParam());
// Fill data for testing
Options options = CurrentOptions();
options.wal_compression = compression_type;
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
// Corrupt the wal
@ -1543,9 +1556,12 @@ TEST_P(DBWALTestWithParams, kSkipAnyCorruptedRecords) {
// Corruption offset position
int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); // WAL file
// WAL compression type
CompressionType compression_type = std::get<3>(GetParam());
// Fill data for testing
Options options = CurrentOptions();
options.wal_compression = compression_type;
const size_t row_count = RecoveryTestHelper::FillData(this, &options);
// Corrupt the WAL
@ -1769,8 +1785,11 @@ TEST_P(DBWALTestWithParamsVaryingRecoveryMode,
int corrupt_offset = std::get<1>(GetParam());
int wal_file_id = std::get<2>(GetParam()); // WAL file
WALRecoveryMode recovery_mode = std::get<3>(GetParam());
// WAL compression type
CompressionType compression_type = std::get<4>(GetParam());
options.wal_recovery_mode = recovery_mode;
options.wal_compression = compression_type;
// Create corrupted WAL
RecoveryTestHelper::FillData(this, &options);
RecoveryTestHelper::CorruptWAL(this, options, corrupt_offset * .3,

@ -32,8 +32,11 @@ enum RecordType {
kRecyclableFirstType = 6,
kRecyclableMiddleType = 7,
kRecyclableLastType = 8,
// Compression Type
kSetCompressionType = 9,
};
static const int kMaxRecordType = kRecyclableLastType;
static const int kMaxRecordType = kSetCompressionType;
static const unsigned int kBlockSize = 32768;

@ -38,7 +38,10 @@ Reader::Reader(std::shared_ptr<Logger> info_log,
last_record_offset_(0),
end_of_buffer_offset_(0),
log_number_(log_num),
recycled_(false) {}
recycled_(false),
first_record_read_(false),
compression_type_(kNoCompression),
compression_type_record_read_(false) {}
Reader::~Reader() {
delete[] backing_store_;
@ -79,6 +82,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
return true;
case kFirstType:
@ -114,6 +118,7 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
return true;
}
break;
@ -212,6 +217,30 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
}
break;
case kSetCompressionType: {
if (compression_type_record_read_) {
ReportCorruption(fragment.size(),
"read multiple SetCompressionType records");
}
if (first_record_read_) {
ReportCorruption(fragment.size(),
"SetCompressionType not the first record");
}
prospective_record_offset = physical_record_offset;
scratch->clear();
last_record_offset_ = prospective_record_offset;
CompressionTypeRecord compression_record(kNoCompression);
Status s = compression_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
compression_type_ = compression_record.GetCompressionType();
compression_type_record_read_ = true;
}
break;
}
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
@ -449,6 +478,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
*record = fragment;
prospective_record_offset = physical_record_offset;
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
in_fragmented_record_ = false;
return true;
@ -483,6 +513,7 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
fragments_.clear();
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
first_record_read_ = true;
in_fragmented_record_ = false;
return true;
}
@ -512,6 +543,30 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
}
break;
case kSetCompressionType: {
if (compression_type_record_read_) {
ReportCorruption(fragment.size(),
"read multiple SetCompressionType records");
}
if (first_record_read_) {
ReportCorruption(fragment.size(),
"SetCompressionType not the first record");
}
fragments_.clear();
prospective_record_offset = physical_record_offset;
last_record_offset_ = prospective_record_offset;
in_fragmented_record_ = false;
CompressionTypeRecord compression_record(kNoCompression);
Status s = compression_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
compression_type_ = compression_record.GetCompressionType();
}
break;
}
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u",

@ -8,14 +8,16 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <memory>
#include <stdint.h>
#include <memory>
#include "db/log_format.h"
#include "file/sequence_file_reader.h"
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/compression.h"
namespace ROCKSDB_NAMESPACE {
class Logger;
@ -128,6 +130,13 @@ class Reader {
// Whether this is a recycled log file
bool recycled_;
// Whether the first record has been read or not.
bool first_record_read_;
// Type of compression used
CompressionType compression_type_;
// Track whether the compression type record has been read or not.
bool compression_type_record_read_;
// Extend record types with the following special values
enum {
kEof = kMaxRecordType + 1,

@ -47,7 +47,8 @@ static std::string RandomSkewedString(int i, Random* rnd) {
// Param type is tuple<int, bool>
// get<0>(tuple): non-zero if recycling log, zero if regular log
// get<1>(tuple): true if allow retry after read EOF, false otherwise
class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
class LogTest
: public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> {
private:
class StringSource : public FSSequentialFile {
public:
@ -143,23 +144,27 @@ class LogTest : public ::testing::TestWithParam<std::tuple<int, bool>> {
test::StringSink* sink_;
StringSource* source_;
ReportCollector report_;
std::unique_ptr<Writer> writer_;
std::unique_ptr<Reader> reader_;
protected:
std::unique_ptr<Writer> writer_;
std::unique_ptr<Reader> reader_;
bool allow_retry_read_;
CompressionType compression_type_;
public:
LogTest()
: reader_contents_(),
sink_(new test::StringSink(&reader_contents_)),
source_(new StringSource(reader_contents_, !std::get<1>(GetParam()))),
allow_retry_read_(std::get<1>(GetParam())) {
allow_retry_read_(std::get<1>(GetParam())),
compression_type_(std::get<2>(GetParam())) {
std::unique_ptr<FSWritableFile> sink_holder(sink_);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(sink_holder), "" /* don't care */, FileOptions()));
writer_.reset(
new Writer(std::move(file_writer), 123, std::get<0>(GetParam())));
Writer* writer =
new Writer(std::move(file_writer), 123, std::get<0>(GetParam()), false,
compression_type_);
writer_.reset(writer);
std::unique_ptr<FSSequentialFile> source_holder(source_);
std::unique_ptr<SequentialFileReader> file_reader(
new SequentialFileReader(std::move(source_holder), "" /* file name */));
@ -676,11 +681,11 @@ TEST_P(LogTest, Recycle) {
ASSERT_EQ("EOF", Read());
}
INSTANTIATE_TEST_CASE_P(bool, LogTest,
::testing::Values(std::make_tuple(0, false),
std::make_tuple(0, true),
std::make_tuple(1, false),
std::make_tuple(1, true)));
// Do NOT enable compression for this instantiation.
INSTANTIATE_TEST_CASE_P(
Log, LogTest,
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
::testing::Values(CompressionType::kNoCompression)));
class RetriableLogTest : public ::testing::TestWithParam<int> {
private:
@ -892,6 +897,27 @@ TEST_P(RetriableLogTest, NonBlockingReadFullRecord) {
INSTANTIATE_TEST_CASE_P(bool, RetriableLogTest, ::testing::Values(0, 2));
class CompressionLogTest : public LogTest {
public:
Status SetupTestEnv() { return writer_->AddCompressionTypeRecord(); }
};
TEST_P(CompressionLogTest, Empty) {
ASSERT_OK(SetupTestEnv());
const bool compression_enabled =
std::get<2>(GetParam()) == kNoCompression ? false : true;
// If WAL compression is enabled, a record is added for the compression type
const int compression_record_size = compression_enabled ? kHeaderSize + 4 : 0;
ASSERT_EQ(compression_record_size, WrittenBytes());
ASSERT_EQ("EOF", Read());
}
INSTANTIATE_TEST_CASE_P(
Compression, CompressionLogTest,
::testing::Combine(::testing::Values(0, 1), ::testing::Bool(),
::testing::Values(CompressionType::kNoCompression,
CompressionType::kZSTD)));
} // namespace log
} // namespace ROCKSDB_NAMESPACE

@ -19,12 +19,14 @@ namespace ROCKSDB_NAMESPACE {
namespace log {
Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
bool recycle_log_files, bool manual_flush)
bool recycle_log_files, bool manual_flush,
CompressionType compression_type)
: dest_(std::move(dest)),
block_offset_(0),
log_number_(log_number),
recycle_log_files_(recycle_log_files),
manual_flush_(manual_flush) {
manual_flush_(manual_flush),
compression_type_(compression_type) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1);
@ -112,6 +114,31 @@ IOStatus Writer::AddRecord(const Slice& slice) {
return s;
}
IOStatus Writer::AddCompressionTypeRecord() {
// Should be the first record
assert(block_offset_ == 0);
if (compression_type_ == kNoCompression) {
// No need to add a record
return IOStatus::OK();
}
CompressionTypeRecord record(compression_type_);
std::string encode;
record.EncodeTo(&encode);
IOStatus s =
EmitPhysicalRecord(kSetCompressionType, encode.data(), encode.size());
if (s.ok()) {
if (!manual_flush_) {
s = dest_->Flush();
}
} else {
// Disable compression if the record could not be added.
compression_type_ = kNoCompression;
}
return s;
}
bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); }
IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
@ -126,7 +153,7 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
buf[6] = static_cast<char>(t);
uint32_t crc = type_crc_[t];
if (t < kRecyclableFullType) {
if (t < kRecyclableFullType || t == kSetCompressionType) {
// Legacy record format
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
header_size = kHeaderSize;

@ -12,6 +12,7 @@
#include <memory>
#include "db/log_format.h"
#include "rocksdb/compression_type.h"
#include "rocksdb/io_status.h"
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
@ -72,7 +73,8 @@ class Writer {
// "*dest" must remain live while this Writer is in use.
explicit Writer(std::unique_ptr<WritableFileWriter>&& dest,
uint64_t log_number, bool recycle_log_files,
bool manual_flush = false);
bool manual_flush = false,
CompressionType compressionType = kNoCompression);
// No copying allowed
Writer(const Writer&) = delete;
void operator=(const Writer&) = delete;
@ -80,6 +82,7 @@ class Writer {
~Writer();
IOStatus AddRecord(const Slice& slice);
IOStatus AddCompressionTypeRecord();
WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); }
@ -108,6 +111,9 @@ class Writer {
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::WriteBuffer()
bool manual_flush_;
// Compression Type
CompressionType compression_type_;
};
} // namespace log

@ -1182,7 +1182,8 @@ struct DBOptions {
// This feature is WORK IN PROGRESS
// If enabled WAL records will be compressed before they are written.
// Only zstd is supported.
// Only zstd is supported. Compressed WAL records will be read in supported
// versions regardless of the wal_compression settings.
CompressionType wal_compression = kNoCompression;
// If true, RocksDB supports flushing multiple column families and committing

@ -763,7 +763,7 @@ DEFINE_bool(disable_wal, false, "If true, do not write WAL for write.");
DEFINE_bool(manual_wal_flush, false,
"If true, buffer WAL until buffer is full or a manual FlushWAL().");
DEFINE_string(wal_compression, "string",
DEFINE_string(wal_compression, "none",
"Algorithm to use for WAL compression. none to disable.");
static enum ROCKSDB_NAMESPACE::CompressionType FLAGS_wal_compression_e =
ROCKSDB_NAMESPACE::kNoCompression;

@ -514,6 +514,26 @@ inline bool ZSTDNotFinal_Supported() {
#endif
}
inline bool ZSTD_Streaming_Supported() {
#ifdef ZSTD
return ZSTD_versionNumber() >= 10300;
#else
return false;
#endif
}
inline bool StreamingCompressionTypeSupported(
CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
return true;
case kZSTD:
return ZSTD_Streaming_Supported();
default:
return false;
}
}
inline bool CompressionTypeSupported(CompressionType compression_type) {
switch (compression_type) {
case kNoCompression:
@ -1535,4 +1555,42 @@ inline CacheAllocationPtr UncompressData(
}
}
// Records the compression type for subsequent WAL records.
class CompressionTypeRecord {
public:
explicit CompressionTypeRecord(CompressionType compression_type)
: compression_type_(compression_type) {}
CompressionType GetCompressionType() const { return compression_type_; }
inline void EncodeTo(std::string* dst) const {
assert(dst != nullptr);
PutFixed32(dst, compression_type_);
}
inline Status DecodeFrom(Slice* src) {
constexpr char class_name[] = "CompressionTypeRecord";
uint32_t val;
if (!GetFixed32(src, &val)) {
return Status::Corruption(class_name,
"Error decoding WAL compression type");
}
CompressionType compression_type = static_cast<CompressionType>(val);
if (!StreamingCompressionTypeSupported(compression_type)) {
return Status::Corruption(class_name,
"WAL compression type not supported");
}
compression_type_ = compression_type;
return Status::OK();
}
inline std::string DebugString() const {
return "compression_type: " + CompressionTypeToString(compression_type_);
}
private:
CompressionType compression_type_;
};
} // namespace ROCKSDB_NAMESPACE

Loading…
Cancel
Save