From 47235dda9e729fd41027d6050aae12667d57ea6f Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Thu, 11 May 2023 17:26:19 -0700 Subject: [PATCH] Add support in log writer and reader for a user-defined timestamp size record (#11433) Summary: This patch adds support to write and read a user-defined timestamp size record in log writer and log reader. It will be used by WAL logs to persist the user-defined timestamp format for subsequent WriteBatch records. Reading and writing UDT sizes for WAL logs are not included in this patch. It will be in a follow up. The syntax for the record is: at write time, one such record is added when log writer encountered any non-zero UDT size it hasn't recorded so far. At read time, all such records read up to a point are accumulated and applicable to all subsequent WriteBatch records. Pull Request resolved: https://github.com/facebook/rocksdb/pull/11433 Test Plan: ``` make clean && make -j32 all ./log_test --gtest_filter="*WithTimestampSize*" ``` Reviewed By: ltamasi Differential Revision: D45678708 Pulled By: jowlyzhang fbshipit-source-id: b770c8f45bb7b9383b14aac9f22af781304fb41d --- db/log_format.h | 12 ++-- db/log_reader.cc | 180 +++++++++++++++++++++++++++++++++-------------- db/log_reader.h | 16 +++++ db/log_test.cc | 129 +++++++++++++++++++++++++++++++-- db/log_writer.cc | 34 ++++++++- db/log_writer.h | 16 +++++ util/udt_util.h | 77 ++++++++++++++++++++ 7 files changed, 403 insertions(+), 61 deletions(-) create mode 100644 util/udt_util.h diff --git a/db/log_format.h b/db/log_format.h index d397372f4..a976b3f9e 100644 --- a/db/log_format.h +++ b/db/log_format.h @@ -35,17 +35,21 @@ enum RecordType { // Compression Type kSetCompressionType = 9, + + // User-defined timestamp sizes + kUserDefinedTimestampSizeType = 10, + kRecyclableUserDefinedTimestampSizeType = 11, }; -static const int kMaxRecordType = kSetCompressionType; +constexpr int kMaxRecordType = kRecyclableUserDefinedTimestampSizeType; -static const unsigned int kBlockSize = 32768; +constexpr unsigned int kBlockSize = 32768; // Header is checksum (4 bytes), length (2 bytes), type (1 byte) -static const int kHeaderSize = 4 + 2 + 1; +constexpr int kHeaderSize = 4 + 2 + 1; // Recyclable header is checksum (4 bytes), length (2 bytes), type (1 byte), // log number (4 bytes). -static const int kRecyclableHeaderSize = 4 + 2 + 1 + 4; +constexpr int kRecyclableHeaderSize = 4 + 2 + 1 + 4; } // namespace log } // namespace ROCKSDB_NAMESPACE diff --git a/db/log_reader.cc b/db/log_reader.cc index 575a7d758..5ec262dcd 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -164,6 +164,54 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } break; + case kSetCompressionType: { + if (compression_type_record_read_) { + ReportCorruption(fragment.size(), + "read multiple SetCompressionType records"); + } + if (first_record_read_) { + ReportCorruption(fragment.size(), + "SetCompressionType not the first record"); + } + prospective_record_offset = physical_record_offset; + scratch->clear(); + last_record_offset_ = prospective_record_offset; + CompressionTypeRecord compression_record(kNoCompression); + Status s = compression_record.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption(fragment.size(), + "could not decode SetCompressionType record"); + } else { + InitCompression(compression_record); + } + break; + } + case kUserDefinedTimestampSizeType: + case kRecyclableUserDefinedTimestampSizeType: { + if (in_fragmented_record && !scratch->empty()) { + ReportCorruption( + scratch->size(), + "user-defined timestamp size record interspersed partial record"); + } + prospective_record_offset = physical_record_offset; + scratch->clear(); + last_record_offset_ = prospective_record_offset; + UserDefinedTimestampSizeRecord ts_record; + Status s = ts_record.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption( + fragment.size(), + "could not decode user-defined timestamp size record"); + } else { + s = UpdateRecordedTimestampSize( + ts_record.GetUserDefinedTimestampSize()); + if (!s.ok()) { + ReportCorruption(fragment.size(), s.getState()); + } + } + break; + } + case kBadHeader: if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { @@ -257,29 +305,6 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, } break; - case kSetCompressionType: { - if (compression_type_record_read_) { - ReportCorruption(fragment.size(), - "read multiple SetCompressionType records"); - } - if (first_record_read_) { - ReportCorruption(fragment.size(), - "SetCompressionType not the first record"); - } - prospective_record_offset = physical_record_offset; - scratch->clear(); - last_record_offset_ = prospective_record_offset; - CompressionTypeRecord compression_record(kNoCompression); - Status s = compression_record.DecodeFrom(&fragment); - if (!s.ok()) { - ReportCorruption(fragment.size(), - "could not decode SetCompressionType record"); - } else { - InitCompression(compression_record); - } - break; - } - default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", record_type); @@ -444,7 +469,8 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, const unsigned int type = header[6]; const uint32_t length = a | (b << 8); int header_size = kHeaderSize; - if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || + type == kRecyclableUserDefinedTimestampSizeType) { if (end_of_buffer_offset_ - buffer_.size() == 0) { recycled_ = true; } @@ -500,7 +526,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, buffer_.remove_prefix(header_size + length); - if (!uncompress_ || type == kSetCompressionType) { + if (!uncompress_ || type == kSetCompressionType || + type == kUserDefinedTimestampSizeType || + type == kRecyclableUserDefinedTimestampSizeType) { *result = Slice(header + header_size, length); return type; } else { @@ -567,6 +595,26 @@ void Reader::InitCompression(const CompressionTypeRecord& compression_record) { assert(uncompressed_buffer_); } +Status Reader::UpdateRecordedTimestampSize( + const std::vector>& cf_to_ts_sz) { + for (const auto& [cf, ts_sz] : cf_to_ts_sz) { + // Zero user-defined timestamp size are not recorded. + if (ts_sz == 0) { + return Status::Corruption( + "User-defined timestamp size record contains zero timestamp size."); + } + // The user-defined timestamp size record for a column family should not be + // updated in the same log file. + if (recorded_cf_to_ts_sz_.count(cf) != 0) { + return Status::Corruption( + "User-defined timestamp size record contains update to " + "recorded column family."); + } + recorded_cf_to_ts_sz_.insert(std::make_pair(cf, ts_sz)); + } + return Status::OK(); +} + bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, WALRecoveryMode /*unused*/, uint64_t* /* checksum */) { @@ -635,30 +683,6 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, } break; - case kBadHeader: - case kBadRecord: - case kEof: - case kOldRecord: - if (in_fragmented_record_) { - ReportCorruption(fragments_.size(), "error in middle of record"); - in_fragmented_record_ = false; - fragments_.clear(); - } - break; - - case kBadRecordChecksum: - if (recycled_) { - fragments_.clear(); - return false; - } - ReportCorruption(drop_size, "checksum mismatch"); - if (in_fragmented_record_) { - ReportCorruption(fragments_.size(), "error in middle of record"); - in_fragmented_record_ = false; - fragments_.clear(); - } - break; - case kSetCompressionType: { if (compression_type_record_read_) { ReportCorruption(fragment.size(), @@ -683,6 +707,57 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, break; } + case kUserDefinedTimestampSizeType: + case kRecyclableUserDefinedTimestampSizeType: { + if (in_fragmented_record_ && !scratch->empty()) { + ReportCorruption( + scratch->size(), + "user-defined timestamp size record interspersed partial record"); + } + fragments_.clear(); + prospective_record_offset = physical_record_offset; + last_record_offset_ = prospective_record_offset; + in_fragmented_record_ = false; + UserDefinedTimestampSizeRecord ts_record; + Status s = ts_record.DecodeFrom(&fragment); + if (!s.ok()) { + ReportCorruption( + fragment.size(), + "could not decode user-defined timestamp size record"); + } else { + s = UpdateRecordedTimestampSize( + ts_record.GetUserDefinedTimestampSize()); + if (!s.ok()) { + ReportCorruption(fragment.size(), s.getState()); + } + } + break; + } + + case kBadHeader: + case kBadRecord: + case kEof: + case kOldRecord: + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + + case kBadRecordChecksum: + if (recycled_) { + fragments_.clear(); + return false; + } + ReportCorruption(drop_size, "checksum mismatch"); + if (in_fragmented_record_) { + ReportCorruption(fragments_.size(), "error in middle of record"); + in_fragmented_record_ = false; + fragments_.clear(); + } + break; + default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", @@ -770,7 +845,8 @@ bool FragmentBufferedReader::TryReadFragment( const unsigned int type = header[6]; const uint32_t length = a | (b << 8); int header_size = kHeaderSize; - if (type >= kRecyclableFullType && type <= kRecyclableLastType) { + if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || + type == kRecyclableUserDefinedTimestampSizeType) { if (end_of_buffer_offset_ - buffer_.size() == 0) { recycled_ = true; } @@ -822,7 +898,9 @@ bool FragmentBufferedReader::TryReadFragment( buffer_.remove_prefix(header_size + length); - if (!uncompress_ || type == kSetCompressionType) { + if (!uncompress_ || type == kSetCompressionType || + type == kUserDefinedTimestampSizeType || + type == kRecyclableUserDefinedTimestampSizeType) { *fragment = Slice(header + header_size, length); *fragment_type_or_err = type; return true; diff --git a/db/log_reader.h b/db/log_reader.h index e3be1570e..b7b298e58 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -11,6 +11,8 @@ #include #include +#include +#include #include "db/log_format.h" #include "file/sequence_file_reader.h" @@ -18,6 +20,7 @@ #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "util/compression.h" +#include "util/udt_util.h" #include "util/xxhash.h" namespace ROCKSDB_NAMESPACE { @@ -74,6 +77,12 @@ class Reader { WALRecoveryMode::kTolerateCorruptedTailRecords, uint64_t* record_checksum = nullptr); + // Return the recorded user-defined timestamp size that have been read so + // far. This only applies to WAL logs. + const std::unordered_map& GetRecordedTimestampSize() const { + return recorded_cf_to_ts_sz_; + } + // Returns the physical offset of the last record returned by ReadRecord. // // Undefined before the first call to ReadRecord. @@ -154,6 +163,10 @@ class Reader { // Used for stream hashing uncompressed buffer in ReadPhysicalRecord() XXH3_state_t* uncompress_hash_state_; + // The recorded user-defined timestamp sizes that have been read so far. This + // is only for WAL logs. + std::unordered_map recorded_cf_to_ts_sz_; + // Extend record types with the following special values enum { kEof = kMaxRecordType + 1, @@ -190,6 +203,9 @@ class Reader { void ReportDrop(size_t bytes, const Status& reason); void InitCompression(const CompressionTypeRecord& compression_record); + + Status UpdateRecordedTimestampSize( + const std::vector>& cf_to_ts_sz); }; class FragmentBufferedReader : public Reader { diff --git a/db/log_test.cc b/db/log_test.cc index f4d388f41..7cc4c198e 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -45,9 +45,10 @@ static std::string RandomSkewedString(int i, Random* rnd) { return BigString(NumberString(i), rnd->Skewed(17)); } -// Param type is tuple +// Param type is tuple // get<0>(tuple): non-zero if recycling log, zero if regular log // get<1>(tuple): true if allow retry after read EOF, false otherwise +// get<2>(tuple): type of compression used class LogTest : public ::testing::TestWithParam> { private: @@ -181,20 +182,30 @@ class LogTest Slice* get_reader_contents() { return &reader_contents_; } - void Write(const std::string& msg) { + void Write( + const std::string& msg, + const std::unordered_map* cf_to_ts_sz = nullptr) { + if (cf_to_ts_sz != nullptr && !cf_to_ts_sz->empty()) { + ASSERT_OK(writer_->MaybeAddUserDefinedTimestampSizeRecord(*cf_to_ts_sz)); + } ASSERT_OK(writer_->AddRecord(Slice(msg))); } size_t WrittenBytes() const { return dest_contents().size(); } - std::string Read(const WALRecoveryMode wal_recovery_mode = - WALRecoveryMode::kTolerateCorruptedTailRecords) { + std::string Read( + const WALRecoveryMode wal_recovery_mode = + WALRecoveryMode::kTolerateCorruptedTailRecords, + std::unordered_map* cf_to_ts_sz = nullptr) { std::string scratch; Slice record; bool ret = false; uint64_t record_checksum; ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode, &record_checksum); + if (cf_to_ts_sz != nullptr) { + *cf_to_ts_sz = reader_->GetRecordedTimestampSize(); + } if (ret) { if (!allow_retry_read_) { // allow_retry_read_ means using FragmentBufferedReader which does not @@ -257,6 +268,17 @@ class LogTest return "OK"; } } + + void CheckRecordAndTimestampSize( + std::string record, + std::unordered_map& expected_ts_sz) { + std::unordered_map recorded_ts_sz; + ASSERT_EQ(record, + Read(WALRecoveryMode:: + kTolerateCorruptedTailRecords /* wal_recovery_mode */, + &recorded_ts_sz)); + EXPECT_EQ(expected_ts_sz, recorded_ts_sz); + } }; TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } @@ -274,6 +296,43 @@ TEST_P(LogTest, ReadWrite) { ASSERT_EQ("EOF", Read()); // Make sure reads at eof work } +TEST_P(LogTest, ReadWriteWithTimestampSize) { + std::unordered_map ts_sz_one = { + {1, sizeof(uint64_t)}, + }; + Write("foo", &ts_sz_one); + Write("bar"); + std::unordered_map ts_sz_two = {{2, sizeof(char)}}; + Write("", &ts_sz_two); + Write("xxxx"); + + CheckRecordAndTimestampSize("foo", ts_sz_one); + CheckRecordAndTimestampSize("bar", ts_sz_one); + std::unordered_map expected_ts_sz_two; + // User-defined timestamp size records are accumulated and applied to + // subsequent records. + expected_ts_sz_two.insert(ts_sz_one.begin(), ts_sz_one.end()); + expected_ts_sz_two.insert(ts_sz_two.begin(), ts_sz_two.end()); + CheckRecordAndTimestampSize("", expected_ts_sz_two); + CheckRecordAndTimestampSize("xxxx", expected_ts_sz_two); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("EOF", Read()); // Make sure reads at eof work +} + +TEST_P(LogTest, ReadWriteWithTimestampSizeZeroTimestampIgnored) { + std::unordered_map ts_sz_one = {{1, sizeof(uint64_t)}}; + Write("foo", &ts_sz_one); + std::unordered_map ts_sz_two(ts_sz_one.begin(), + ts_sz_one.end()); + ts_sz_two.insert(std::make_pair(2, 0)); + Write("bar", &ts_sz_two); + + CheckRecordAndTimestampSize("foo", ts_sz_one); + CheckRecordAndTimestampSize("bar", ts_sz_one); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("EOF", Read()); // Make sure reads at eof work +} + TEST_P(LogTest, ManyBlocks) { for (int i = 0; i < 100000; i++) { Write(NumberString(i)); @@ -685,6 +744,39 @@ TEST_P(LogTest, Recycle) { ASSERT_EQ("EOF", Read()); } +TEST_P(LogTest, RecycleWithTimestampSize) { + bool recyclable_log = (std::get<0>(GetParam()) != 0); + if (!recyclable_log) { + return; // test is only valid for recycled logs + } + std::unordered_map ts_sz_one = { + {1, sizeof(uint32_t)}, + }; + Write("foo", &ts_sz_one); + Write("bar"); + Write("baz"); + Write("bif"); + Write("blitz"); + while (get_reader_contents()->size() < log::kBlockSize * 2) { + Write("xxxxxxxxxxxxxxxx"); + } + std::unique_ptr sink( + new test::OverwritingStringSink(get_reader_contents())); + std::unique_ptr dest_holder(new WritableFileWriter( + std::move(sink), "" /* don't care */, FileOptions())); + Writer recycle_writer(std::move(dest_holder), 123, true); + std::unordered_map ts_sz_two = { + {2, sizeof(uint64_t)}, + }; + ASSERT_OK(recycle_writer.MaybeAddUserDefinedTimestampSizeRecord(ts_sz_two)); + ASSERT_OK(recycle_writer.AddRecord(Slice("foooo"))); + ASSERT_OK(recycle_writer.AddRecord(Slice("bar"))); + ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2); + CheckRecordAndTimestampSize("foooo", ts_sz_two); + CheckRecordAndTimestampSize("bar", ts_sz_two); + ASSERT_EQ("EOF", Read()); +} + // Do NOT enable compression for this instantiation. INSTANTIATE_TEST_CASE_P( Log, LogTest, @@ -940,6 +1032,35 @@ TEST_P(CompressionLogTest, ReadWrite) { ASSERT_EQ("EOF", Read()); // Make sure reads at eof work } +TEST_P(CompressionLogTest, ReadWriteWithTimestampSize) { + CompressionType compression_type = std::get<2>(GetParam()); + if (!StreamingCompressionTypeSupported(compression_type)) { + ROCKSDB_GTEST_SKIP("Test requires support for compression type"); + return; + } + ASSERT_OK(SetupTestEnv()); + std::unordered_map ts_sz_one = { + {1, sizeof(uint64_t)}, + }; + Write("foo", &ts_sz_one); + Write("bar"); + std::unordered_map ts_sz_two = {{2, sizeof(char)}}; + Write("", &ts_sz_two); + Write("xxxx"); + + CheckRecordAndTimestampSize("foo", ts_sz_one); + CheckRecordAndTimestampSize("bar", ts_sz_one); + std::unordered_map expected_ts_sz_two; + // User-defined timestamp size records are accumulated and applied to + // subsequent records. + expected_ts_sz_two.insert(ts_sz_one.begin(), ts_sz_one.end()); + expected_ts_sz_two.insert(ts_sz_two.begin(), ts_sz_two.end()); + CheckRecordAndTimestampSize("", expected_ts_sz_two); + CheckRecordAndTimestampSize("xxxx", expected_ts_sz_two); + ASSERT_EQ("EOF", Read()); + ASSERT_EQ("EOF", Read()); // Make sure reads at eof work +} + TEST_P(CompressionLogTest, ManyBlocks) { CompressionType compression_type = std::get<2>(GetParam()); if (!StreamingCompressionTypeSupported(compression_type)) { diff --git a/db/log_writer.cc b/db/log_writer.cc index 56f58543e..1ddf0a26c 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -16,6 +16,7 @@ #include "rocksdb/io_status.h" #include "util/coding.h" #include "util/crc32c.h" +#include "util/udt_util.h" namespace ROCKSDB_NAMESPACE { namespace log { @@ -73,7 +74,6 @@ IOStatus Writer::AddRecord(const Slice& slice, // Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record - IOStatus s; bool begin = true; int compress_remaining = 0; bool compress_start = false; @@ -81,6 +81,8 @@ IOStatus Writer::AddRecord(const Slice& slice, compress_->Reset(); compress_start = true; } + + IOStatus s; do { const int64_t leftover = kBlockSize - block_offset_; assert(leftover >= 0); @@ -194,6 +196,33 @@ IOStatus Writer::AddCompressionTypeRecord() { return s; } +IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( + const std::unordered_map& cf_to_ts_sz, + Env::IOPriority rate_limiter_priority) { + std::vector> ts_sz_to_record; + for (const auto& [cf_id, ts_sz] : cf_to_ts_sz) { + if (recorded_cf_to_ts_sz_.count(cf_id) != 0) { + // A column family's user-defined timestamp size should not be + // updated while DB is running. + assert(recorded_cf_to_ts_sz_[cf_id] == ts_sz); + } else if (ts_sz != 0) { + ts_sz_to_record.emplace_back(cf_id, ts_sz); + recorded_cf_to_ts_sz_.insert(std::make_pair(cf_id, ts_sz)); + } + } + if (ts_sz_to_record.empty()) { + return IOStatus::OK(); + } + + UserDefinedTimestampSizeRecord record(std::move(ts_sz_to_record)); + std::string encoded; + record.EncodeTo(&encoded); + RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType + : kUserDefinedTimestampSizeType; + return EmitPhysicalRecord(type, encoded.data(), encoded.size(), + rate_limiter_priority); +} + bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); } IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, @@ -209,7 +238,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, buf[6] = static_cast(t); uint32_t crc = type_crc_[t]; - if (t < kRecyclableFullType || t == kSetCompressionType) { + if (t < kRecyclableFullType || t == kSetCompressionType || + t == kUserDefinedTimestampSizeType) { // Legacy record format assert(block_offset_ + kHeaderSize + n <= kBlockSize); header_size = kHeaderSize; diff --git a/db/log_writer.h b/db/log_writer.h index 5d266e434..75e44d1a4 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -10,6 +10,8 @@ #include #include +#include +#include #include "db/log_format.h" #include "rocksdb/compression_type.h" @@ -87,6 +89,15 @@ class Writer { Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); IOStatus AddCompressionTypeRecord(); + // If there are column families in `cf_to_ts_sz` not included in + // `recorded_cf_to_ts_sz_` and its user-defined timestamp size is non-zero, + // adds a record of type kUserDefinedTimestampSizeType or + // kRecyclableUserDefinedTimestampSizeType for these column families. + // This timestamp size record applies to all subsequent records. + IOStatus MaybeAddUserDefinedTimestampSizeRecord( + const std::unordered_map& cf_to_ts_sz, + Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); + WritableFileWriter* file() { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); } @@ -122,6 +133,11 @@ class Writer { StreamingCompress* compress_; // Reusable compressed output buffer std::unique_ptr compressed_buffer_; + + // The recorded user-defined timestamp size that have been written so far. + // Since the user-defined timestamp size cannot be changed while the DB is + // running, existing entry in this map cannot be updated. + std::unordered_map recorded_cf_to_ts_sz_; }; } // namespace log diff --git a/util/udt_util.h b/util/udt_util.h new file mode 100644 index 000000000..219a093d8 --- /dev/null +++ b/util/udt_util.h @@ -0,0 +1,77 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include +#include + +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +// Dummy record in WAL logs signaling user-defined timestamp sizes for +// subsequent records. +class UserDefinedTimestampSizeRecord { + public: + UserDefinedTimestampSizeRecord() {} + explicit UserDefinedTimestampSizeRecord( + std::vector>&& cf_to_ts_sz) + : cf_to_ts_sz_(std::move(cf_to_ts_sz)) {} + + const std::vector>& GetUserDefinedTimestampSize() + const { + return cf_to_ts_sz_; + } + + inline void EncodeTo(std::string* dst) const { + assert(dst != nullptr); + for (const auto& [cf_id, ts_sz] : cf_to_ts_sz_) { + assert(ts_sz != 0); + PutFixed32(dst, cf_id); + PutFixed16(dst, static_cast(ts_sz)); + } + } + + inline Status DecodeFrom(Slice* src) { + const size_t total_size = src->size(); + if ((total_size % kSizePerColumnFamily) != 0) { + std::ostringstream oss; + oss << "User-defined timestamp size record length: " << total_size + << " is not a multiple of " << kSizePerColumnFamily << std::endl; + return Status::Corruption(oss.str()); + } + int num_of_entries = static_cast(total_size / kSizePerColumnFamily); + for (int i = 0; i < num_of_entries; i++) { + uint32_t cf_id = 0; + uint16_t ts_sz = 0; + if (!GetFixed32(src, &cf_id) || !GetFixed16(src, &ts_sz)) { + return Status::Corruption( + "Error decoding user-defined timestamp size record entry"); + } + cf_to_ts_sz_.emplace_back(cf_id, static_cast(ts_sz)); + } + return Status::OK(); + } + + inline std::string DebugString() const { + std::ostringstream oss; + + for (const auto& [cf_id, ts_sz] : cf_to_ts_sz_) { + oss << "Column family: " << cf_id + << ", user-defined timestamp size: " << ts_sz << std::endl; + } + return oss.str(); + } + + private: + // 4 bytes for column family id, 2 bytes for user-defined timestamp size. + static constexpr size_t kSizePerColumnFamily = 4 + 2; + + std::vector> cf_to_ts_sz_; +}; + +} // namespace ROCKSDB_NAMESPACE