Add support in log writer and reader for a user-defined timestamp size record (#11433)

Summary:
This patch adds support to write and read a user-defined timestamp size record in log writer and log reader. It will be used by WAL logs to persist the user-defined timestamp format for subsequent WriteBatch records. Reading and writing UDT sizes for WAL logs are not included in this patch. It will be in a follow up.

The syntax for the record is: at write time, one such record is added when log writer encountered any non-zero UDT size it hasn't recorded so far. At read time, all such records read up to a point are accumulated and applicable to all subsequent WriteBatch records.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11433

Test Plan:
```
make clean && make -j32 all
./log_test --gtest_filter="*WithTimestampSize*"
```

Reviewed By: ltamasi

Differential Revision: D45678708

Pulled By: jowlyzhang

fbshipit-source-id: b770c8f45bb7b9383b14aac9f22af781304fb41d
oxigraph-8.3.2
Yu Zhang 2 years ago committed by Facebook GitHub Bot
parent 8827cd0618
commit 47235dda9e
  1. 12
      db/log_format.h
  2. 180
      db/log_reader.cc
  3. 16
      db/log_reader.h
  4. 129
      db/log_test.cc
  5. 34
      db/log_writer.cc
  6. 16
      db/log_writer.h
  7. 77
      util/udt_util.h

@ -35,17 +35,21 @@ enum RecordType {
// Compression Type // Compression Type
kSetCompressionType = 9, kSetCompressionType = 9,
// User-defined timestamp sizes
kUserDefinedTimestampSizeType = 10,
kRecyclableUserDefinedTimestampSizeType = 11,
}; };
static const int kMaxRecordType = kSetCompressionType; constexpr int kMaxRecordType = kRecyclableUserDefinedTimestampSizeType;
static const unsigned int kBlockSize = 32768; constexpr unsigned int kBlockSize = 32768;
// Header is checksum (4 bytes), length (2 bytes), type (1 byte) // Header is checksum (4 bytes), length (2 bytes), type (1 byte)
static const int kHeaderSize = 4 + 2 + 1; constexpr int kHeaderSize = 4 + 2 + 1;
// Recyclable header is checksum (4 bytes), length (2 bytes), type (1 byte), // Recyclable header is checksum (4 bytes), length (2 bytes), type (1 byte),
// log number (4 bytes). // log number (4 bytes).
static const int kRecyclableHeaderSize = 4 + 2 + 1 + 4; constexpr int kRecyclableHeaderSize = 4 + 2 + 1 + 4;
} // namespace log } // namespace log
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -164,6 +164,54 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
} }
break; break;
case kSetCompressionType: {
if (compression_type_record_read_) {
ReportCorruption(fragment.size(),
"read multiple SetCompressionType records");
}
if (first_record_read_) {
ReportCorruption(fragment.size(),
"SetCompressionType not the first record");
}
prospective_record_offset = physical_record_offset;
scratch->clear();
last_record_offset_ = prospective_record_offset;
CompressionTypeRecord compression_record(kNoCompression);
Status s = compression_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
InitCompression(compression_record);
}
break;
}
case kUserDefinedTimestampSizeType:
case kRecyclableUserDefinedTimestampSizeType: {
if (in_fragmented_record && !scratch->empty()) {
ReportCorruption(
scratch->size(),
"user-defined timestamp size record interspersed partial record");
}
prospective_record_offset = physical_record_offset;
scratch->clear();
last_record_offset_ = prospective_record_offset;
UserDefinedTimestampSizeRecord ts_record;
Status s = ts_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(
fragment.size(),
"could not decode user-defined timestamp size record");
} else {
s = UpdateRecordedTimestampSize(
ts_record.GetUserDefinedTimestampSize());
if (!s.ok()) {
ReportCorruption(fragment.size(), s.getState());
}
}
break;
}
case kBadHeader: case kBadHeader:
if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency ||
wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) {
@ -257,29 +305,6 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch,
} }
break; break;
case kSetCompressionType: {
if (compression_type_record_read_) {
ReportCorruption(fragment.size(),
"read multiple SetCompressionType records");
}
if (first_record_read_) {
ReportCorruption(fragment.size(),
"SetCompressionType not the first record");
}
prospective_record_offset = physical_record_offset;
scratch->clear();
last_record_offset_ = prospective_record_offset;
CompressionTypeRecord compression_record(kNoCompression);
Status s = compression_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(fragment.size(),
"could not decode SetCompressionType record");
} else {
InitCompression(compression_record);
}
break;
}
default: { default: {
char buf[40]; char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type); snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
@ -444,7 +469,8 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
const unsigned int type = header[6]; const unsigned int type = header[6];
const uint32_t length = a | (b << 8); const uint32_t length = a | (b << 8);
int header_size = kHeaderSize; int header_size = kHeaderSize;
if (type >= kRecyclableFullType && type <= kRecyclableLastType) { if ((type >= kRecyclableFullType && type <= kRecyclableLastType) ||
type == kRecyclableUserDefinedTimestampSizeType) {
if (end_of_buffer_offset_ - buffer_.size() == 0) { if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true; recycled_ = true;
} }
@ -500,7 +526,9 @@ unsigned int Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size,
buffer_.remove_prefix(header_size + length); buffer_.remove_prefix(header_size + length);
if (!uncompress_ || type == kSetCompressionType) { if (!uncompress_ || type == kSetCompressionType ||
type == kUserDefinedTimestampSizeType ||
type == kRecyclableUserDefinedTimestampSizeType) {
*result = Slice(header + header_size, length); *result = Slice(header + header_size, length);
return type; return type;
} else { } else {
@ -567,6 +595,26 @@ void Reader::InitCompression(const CompressionTypeRecord& compression_record) {
assert(uncompressed_buffer_); assert(uncompressed_buffer_);
} }
Status Reader::UpdateRecordedTimestampSize(
const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz) {
for (const auto& [cf, ts_sz] : cf_to_ts_sz) {
// Zero user-defined timestamp size are not recorded.
if (ts_sz == 0) {
return Status::Corruption(
"User-defined timestamp size record contains zero timestamp size.");
}
// The user-defined timestamp size record for a column family should not be
// updated in the same log file.
if (recorded_cf_to_ts_sz_.count(cf) != 0) {
return Status::Corruption(
"User-defined timestamp size record contains update to "
"recorded column family.");
}
recorded_cf_to_ts_sz_.insert(std::make_pair(cf, ts_sz));
}
return Status::OK();
}
bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
WALRecoveryMode /*unused*/, WALRecoveryMode /*unused*/,
uint64_t* /* checksum */) { uint64_t* /* checksum */) {
@ -635,30 +683,6 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
} }
break; break;
case kBadHeader:
case kBadRecord:
case kEof:
case kOldRecord:
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;
case kBadRecordChecksum:
if (recycled_) {
fragments_.clear();
return false;
}
ReportCorruption(drop_size, "checksum mismatch");
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;
case kSetCompressionType: { case kSetCompressionType: {
if (compression_type_record_read_) { if (compression_type_record_read_) {
ReportCorruption(fragment.size(), ReportCorruption(fragment.size(),
@ -683,6 +707,57 @@ bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch,
break; break;
} }
case kUserDefinedTimestampSizeType:
case kRecyclableUserDefinedTimestampSizeType: {
if (in_fragmented_record_ && !scratch->empty()) {
ReportCorruption(
scratch->size(),
"user-defined timestamp size record interspersed partial record");
}
fragments_.clear();
prospective_record_offset = physical_record_offset;
last_record_offset_ = prospective_record_offset;
in_fragmented_record_ = false;
UserDefinedTimestampSizeRecord ts_record;
Status s = ts_record.DecodeFrom(&fragment);
if (!s.ok()) {
ReportCorruption(
fragment.size(),
"could not decode user-defined timestamp size record");
} else {
s = UpdateRecordedTimestampSize(
ts_record.GetUserDefinedTimestampSize());
if (!s.ok()) {
ReportCorruption(fragment.size(), s.getState());
}
}
break;
}
case kBadHeader:
case kBadRecord:
case kEof:
case kOldRecord:
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;
case kBadRecordChecksum:
if (recycled_) {
fragments_.clear();
return false;
}
ReportCorruption(drop_size, "checksum mismatch");
if (in_fragmented_record_) {
ReportCorruption(fragments_.size(), "error in middle of record");
in_fragmented_record_ = false;
fragments_.clear();
}
break;
default: { default: {
char buf[40]; char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", snprintf(buf, sizeof(buf), "unknown record type %u",
@ -770,7 +845,8 @@ bool FragmentBufferedReader::TryReadFragment(
const unsigned int type = header[6]; const unsigned int type = header[6];
const uint32_t length = a | (b << 8); const uint32_t length = a | (b << 8);
int header_size = kHeaderSize; int header_size = kHeaderSize;
if (type >= kRecyclableFullType && type <= kRecyclableLastType) { if ((type >= kRecyclableFullType && type <= kRecyclableLastType) ||
type == kRecyclableUserDefinedTimestampSizeType) {
if (end_of_buffer_offset_ - buffer_.size() == 0) { if (end_of_buffer_offset_ - buffer_.size() == 0) {
recycled_ = true; recycled_ = true;
} }
@ -822,7 +898,9 @@ bool FragmentBufferedReader::TryReadFragment(
buffer_.remove_prefix(header_size + length); buffer_.remove_prefix(header_size + length);
if (!uncompress_ || type == kSetCompressionType) { if (!uncompress_ || type == kSetCompressionType ||
type == kUserDefinedTimestampSizeType ||
type == kRecyclableUserDefinedTimestampSizeType) {
*fragment = Slice(header + header_size, length); *fragment = Slice(header + header_size, length);
*fragment_type_or_err = type; *fragment_type_or_err = type;
return true; return true;

@ -11,6 +11,8 @@
#include <stdint.h> #include <stdint.h>
#include <memory> #include <memory>
#include <unordered_map>
#include <vector>
#include "db/log_format.h" #include "db/log_format.h"
#include "file/sequence_file_reader.h" #include "file/sequence_file_reader.h"
@ -18,6 +20,7 @@
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "util/compression.h" #include "util/compression.h"
#include "util/udt_util.h"
#include "util/xxhash.h" #include "util/xxhash.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -74,6 +77,12 @@ class Reader {
WALRecoveryMode::kTolerateCorruptedTailRecords, WALRecoveryMode::kTolerateCorruptedTailRecords,
uint64_t* record_checksum = nullptr); uint64_t* record_checksum = nullptr);
// Return the recorded user-defined timestamp size that have been read so
// far. This only applies to WAL logs.
const std::unordered_map<uint32_t, size_t>& GetRecordedTimestampSize() const {
return recorded_cf_to_ts_sz_;
}
// Returns the physical offset of the last record returned by ReadRecord. // Returns the physical offset of the last record returned by ReadRecord.
// //
// Undefined before the first call to ReadRecord. // Undefined before the first call to ReadRecord.
@ -154,6 +163,10 @@ class Reader {
// Used for stream hashing uncompressed buffer in ReadPhysicalRecord() // Used for stream hashing uncompressed buffer in ReadPhysicalRecord()
XXH3_state_t* uncompress_hash_state_; XXH3_state_t* uncompress_hash_state_;
// The recorded user-defined timestamp sizes that have been read so far. This
// is only for WAL logs.
std::unordered_map<uint32_t, size_t> recorded_cf_to_ts_sz_;
// Extend record types with the following special values // Extend record types with the following special values
enum { enum {
kEof = kMaxRecordType + 1, kEof = kMaxRecordType + 1,
@ -190,6 +203,9 @@ class Reader {
void ReportDrop(size_t bytes, const Status& reason); void ReportDrop(size_t bytes, const Status& reason);
void InitCompression(const CompressionTypeRecord& compression_record); void InitCompression(const CompressionTypeRecord& compression_record);
Status UpdateRecordedTimestampSize(
const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz);
}; };
class FragmentBufferedReader : public Reader { class FragmentBufferedReader : public Reader {

@ -45,9 +45,10 @@ static std::string RandomSkewedString(int i, Random* rnd) {
return BigString(NumberString(i), rnd->Skewed(17)); return BigString(NumberString(i), rnd->Skewed(17));
} }
// Param type is tuple<int, bool> // Param type is tuple<int, bool, CompressionType>
// get<0>(tuple): non-zero if recycling log, zero if regular log // get<0>(tuple): non-zero if recycling log, zero if regular log
// get<1>(tuple): true if allow retry after read EOF, false otherwise // get<1>(tuple): true if allow retry after read EOF, false otherwise
// get<2>(tuple): type of compression used
class LogTest class LogTest
: public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> { : public ::testing::TestWithParam<std::tuple<int, bool, CompressionType>> {
private: private:
@ -181,20 +182,30 @@ class LogTest
Slice* get_reader_contents() { return &reader_contents_; } Slice* get_reader_contents() { return &reader_contents_; }
void Write(const std::string& msg) { void Write(
const std::string& msg,
const std::unordered_map<uint32_t, size_t>* cf_to_ts_sz = nullptr) {
if (cf_to_ts_sz != nullptr && !cf_to_ts_sz->empty()) {
ASSERT_OK(writer_->MaybeAddUserDefinedTimestampSizeRecord(*cf_to_ts_sz));
}
ASSERT_OK(writer_->AddRecord(Slice(msg))); ASSERT_OK(writer_->AddRecord(Slice(msg)));
} }
size_t WrittenBytes() const { return dest_contents().size(); } size_t WrittenBytes() const { return dest_contents().size(); }
std::string Read(const WALRecoveryMode wal_recovery_mode = std::string Read(
WALRecoveryMode::kTolerateCorruptedTailRecords) { const WALRecoveryMode wal_recovery_mode =
WALRecoveryMode::kTolerateCorruptedTailRecords,
std::unordered_map<uint32_t, size_t>* cf_to_ts_sz = nullptr) {
std::string scratch; std::string scratch;
Slice record; Slice record;
bool ret = false; bool ret = false;
uint64_t record_checksum; uint64_t record_checksum;
ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode, ret = reader_->ReadRecord(&record, &scratch, wal_recovery_mode,
&record_checksum); &record_checksum);
if (cf_to_ts_sz != nullptr) {
*cf_to_ts_sz = reader_->GetRecordedTimestampSize();
}
if (ret) { if (ret) {
if (!allow_retry_read_) { if (!allow_retry_read_) {
// allow_retry_read_ means using FragmentBufferedReader which does not // allow_retry_read_ means using FragmentBufferedReader which does not
@ -257,6 +268,17 @@ class LogTest
return "OK"; return "OK";
} }
} }
void CheckRecordAndTimestampSize(
std::string record,
std::unordered_map<uint32_t, size_t>& expected_ts_sz) {
std::unordered_map<uint32_t, size_t> recorded_ts_sz;
ASSERT_EQ(record,
Read(WALRecoveryMode::
kTolerateCorruptedTailRecords /* wal_recovery_mode */,
&recorded_ts_sz));
EXPECT_EQ(expected_ts_sz, recorded_ts_sz);
}
}; };
TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); } TEST_P(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
@ -274,6 +296,43 @@ TEST_P(LogTest, ReadWrite) {
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
} }
TEST_P(LogTest, ReadWriteWithTimestampSize) {
std::unordered_map<uint32_t, size_t> ts_sz_one = {
{1, sizeof(uint64_t)},
};
Write("foo", &ts_sz_one);
Write("bar");
std::unordered_map<uint32_t, size_t> ts_sz_two = {{2, sizeof(char)}};
Write("", &ts_sz_two);
Write("xxxx");
CheckRecordAndTimestampSize("foo", ts_sz_one);
CheckRecordAndTimestampSize("bar", ts_sz_one);
std::unordered_map<uint32_t, size_t> expected_ts_sz_two;
// User-defined timestamp size records are accumulated and applied to
// subsequent records.
expected_ts_sz_two.insert(ts_sz_one.begin(), ts_sz_one.end());
expected_ts_sz_two.insert(ts_sz_two.begin(), ts_sz_two.end());
CheckRecordAndTimestampSize("", expected_ts_sz_two);
CheckRecordAndTimestampSize("xxxx", expected_ts_sz_two);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
}
TEST_P(LogTest, ReadWriteWithTimestampSizeZeroTimestampIgnored) {
std::unordered_map<uint32_t, size_t> ts_sz_one = {{1, sizeof(uint64_t)}};
Write("foo", &ts_sz_one);
std::unordered_map<uint32_t, size_t> ts_sz_two(ts_sz_one.begin(),
ts_sz_one.end());
ts_sz_two.insert(std::make_pair(2, 0));
Write("bar", &ts_sz_two);
CheckRecordAndTimestampSize("foo", ts_sz_one);
CheckRecordAndTimestampSize("bar", ts_sz_one);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
}
TEST_P(LogTest, ManyBlocks) { TEST_P(LogTest, ManyBlocks) {
for (int i = 0; i < 100000; i++) { for (int i = 0; i < 100000; i++) {
Write(NumberString(i)); Write(NumberString(i));
@ -685,6 +744,39 @@ TEST_P(LogTest, Recycle) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST_P(LogTest, RecycleWithTimestampSize) {
bool recyclable_log = (std::get<0>(GetParam()) != 0);
if (!recyclable_log) {
return; // test is only valid for recycled logs
}
std::unordered_map<uint32_t, size_t> ts_sz_one = {
{1, sizeof(uint32_t)},
};
Write("foo", &ts_sz_one);
Write("bar");
Write("baz");
Write("bif");
Write("blitz");
while (get_reader_contents()->size() < log::kBlockSize * 2) {
Write("xxxxxxxxxxxxxxxx");
}
std::unique_ptr<FSWritableFile> sink(
new test::OverwritingStringSink(get_reader_contents()));
std::unique_ptr<WritableFileWriter> dest_holder(new WritableFileWriter(
std::move(sink), "" /* don't care */, FileOptions()));
Writer recycle_writer(std::move(dest_holder), 123, true);
std::unordered_map<uint32_t, size_t> ts_sz_two = {
{2, sizeof(uint64_t)},
};
ASSERT_OK(recycle_writer.MaybeAddUserDefinedTimestampSizeRecord(ts_sz_two));
ASSERT_OK(recycle_writer.AddRecord(Slice("foooo")));
ASSERT_OK(recycle_writer.AddRecord(Slice("bar")));
ASSERT_GE(get_reader_contents()->size(), log::kBlockSize * 2);
CheckRecordAndTimestampSize("foooo", ts_sz_two);
CheckRecordAndTimestampSize("bar", ts_sz_two);
ASSERT_EQ("EOF", Read());
}
// Do NOT enable compression for this instantiation. // Do NOT enable compression for this instantiation.
INSTANTIATE_TEST_CASE_P( INSTANTIATE_TEST_CASE_P(
Log, LogTest, Log, LogTest,
@ -940,6 +1032,35 @@ TEST_P(CompressionLogTest, ReadWrite) {
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
} }
TEST_P(CompressionLogTest, ReadWriteWithTimestampSize) {
CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) {
ROCKSDB_GTEST_SKIP("Test requires support for compression type");
return;
}
ASSERT_OK(SetupTestEnv());
std::unordered_map<uint32_t, size_t> ts_sz_one = {
{1, sizeof(uint64_t)},
};
Write("foo", &ts_sz_one);
Write("bar");
std::unordered_map<uint32_t, size_t> ts_sz_two = {{2, sizeof(char)}};
Write("", &ts_sz_two);
Write("xxxx");
CheckRecordAndTimestampSize("foo", ts_sz_one);
CheckRecordAndTimestampSize("bar", ts_sz_one);
std::unordered_map<uint32_t, size_t> expected_ts_sz_two;
// User-defined timestamp size records are accumulated and applied to
// subsequent records.
expected_ts_sz_two.insert(ts_sz_one.begin(), ts_sz_one.end());
expected_ts_sz_two.insert(ts_sz_two.begin(), ts_sz_two.end());
CheckRecordAndTimestampSize("", expected_ts_sz_two);
CheckRecordAndTimestampSize("xxxx", expected_ts_sz_two);
ASSERT_EQ("EOF", Read());
ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
}
TEST_P(CompressionLogTest, ManyBlocks) { TEST_P(CompressionLogTest, ManyBlocks) {
CompressionType compression_type = std::get<2>(GetParam()); CompressionType compression_type = std::get<2>(GetParam());
if (!StreamingCompressionTypeSupported(compression_type)) { if (!StreamingCompressionTypeSupported(compression_type)) {

@ -16,6 +16,7 @@
#include "rocksdb/io_status.h" #include "rocksdb/io_status.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/crc32c.h" #include "util/crc32c.h"
#include "util/udt_util.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
namespace log { namespace log {
@ -73,7 +74,6 @@ IOStatus Writer::AddRecord(const Slice& slice,
// Fragment the record if necessary and emit it. Note that if slice // Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single // is empty, we still want to iterate once to emit a single
// zero-length record // zero-length record
IOStatus s;
bool begin = true; bool begin = true;
int compress_remaining = 0; int compress_remaining = 0;
bool compress_start = false; bool compress_start = false;
@ -81,6 +81,8 @@ IOStatus Writer::AddRecord(const Slice& slice,
compress_->Reset(); compress_->Reset();
compress_start = true; compress_start = true;
} }
IOStatus s;
do { do {
const int64_t leftover = kBlockSize - block_offset_; const int64_t leftover = kBlockSize - block_offset_;
assert(leftover >= 0); assert(leftover >= 0);
@ -194,6 +196,33 @@ IOStatus Writer::AddCompressionTypeRecord() {
return s; return s;
} }
IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
const std::unordered_map<uint32_t, size_t>& cf_to_ts_sz,
Env::IOPriority rate_limiter_priority) {
std::vector<std::pair<uint32_t, size_t>> ts_sz_to_record;
for (const auto& [cf_id, ts_sz] : cf_to_ts_sz) {
if (recorded_cf_to_ts_sz_.count(cf_id) != 0) {
// A column family's user-defined timestamp size should not be
// updated while DB is running.
assert(recorded_cf_to_ts_sz_[cf_id] == ts_sz);
} else if (ts_sz != 0) {
ts_sz_to_record.emplace_back(cf_id, ts_sz);
recorded_cf_to_ts_sz_.insert(std::make_pair(cf_id, ts_sz));
}
}
if (ts_sz_to_record.empty()) {
return IOStatus::OK();
}
UserDefinedTimestampSizeRecord record(std::move(ts_sz_to_record));
std::string encoded;
record.EncodeTo(&encoded);
RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType
: kUserDefinedTimestampSizeType;
return EmitPhysicalRecord(type, encoded.data(), encoded.size(),
rate_limiter_priority);
}
bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); } bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n, IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n,
@ -209,7 +238,8 @@ IOStatus Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n,
buf[6] = static_cast<char>(t); buf[6] = static_cast<char>(t);
uint32_t crc = type_crc_[t]; uint32_t crc = type_crc_[t];
if (t < kRecyclableFullType || t == kSetCompressionType) { if (t < kRecyclableFullType || t == kSetCompressionType ||
t == kUserDefinedTimestampSizeType) {
// Legacy record format // Legacy record format
assert(block_offset_ + kHeaderSize + n <= kBlockSize); assert(block_offset_ + kHeaderSize + n <= kBlockSize);
header_size = kHeaderSize; header_size = kHeaderSize;

@ -10,6 +10,8 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <unordered_map>
#include <vector>
#include "db/log_format.h" #include "db/log_format.h"
#include "rocksdb/compression_type.h" #include "rocksdb/compression_type.h"
@ -87,6 +89,15 @@ class Writer {
Env::IOPriority rate_limiter_priority = Env::IO_TOTAL); Env::IOPriority rate_limiter_priority = Env::IO_TOTAL);
IOStatus AddCompressionTypeRecord(); IOStatus AddCompressionTypeRecord();
// If there are column families in `cf_to_ts_sz` not included in
// `recorded_cf_to_ts_sz_` and its user-defined timestamp size is non-zero,
// adds a record of type kUserDefinedTimestampSizeType or
// kRecyclableUserDefinedTimestampSizeType for these column families.
// This timestamp size record applies to all subsequent records.
IOStatus MaybeAddUserDefinedTimestampSizeRecord(
const std::unordered_map<uint32_t, size_t>& cf_to_ts_sz,
Env::IOPriority rate_limiter_priority = Env::IO_TOTAL);
WritableFileWriter* file() { return dest_.get(); } WritableFileWriter* file() { return dest_.get(); }
const WritableFileWriter* file() const { return dest_.get(); } const WritableFileWriter* file() const { return dest_.get(); }
@ -122,6 +133,11 @@ class Writer {
StreamingCompress* compress_; StreamingCompress* compress_;
// Reusable compressed output buffer // Reusable compressed output buffer
std::unique_ptr<char[]> compressed_buffer_; std::unique_ptr<char[]> compressed_buffer_;
// The recorded user-defined timestamp size that have been written so far.
// Since the user-defined timestamp size cannot be changed while the DB is
// running, existing entry in this map cannot be updated.
std::unordered_map<uint32_t, size_t> recorded_cf_to_ts_sz_;
}; };
} // namespace log } // namespace log

@ -0,0 +1,77 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <sstream>
#include <vector>
#include "rocksdb/slice.h"
#include "rocksdb/status.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
// Dummy record in WAL logs signaling user-defined timestamp sizes for
// subsequent records.
class UserDefinedTimestampSizeRecord {
public:
UserDefinedTimestampSizeRecord() {}
explicit UserDefinedTimestampSizeRecord(
std::vector<std::pair<uint32_t, size_t>>&& cf_to_ts_sz)
: cf_to_ts_sz_(std::move(cf_to_ts_sz)) {}
const std::vector<std::pair<uint32_t, size_t>>& GetUserDefinedTimestampSize()
const {
return cf_to_ts_sz_;
}
inline void EncodeTo(std::string* dst) const {
assert(dst != nullptr);
for (const auto& [cf_id, ts_sz] : cf_to_ts_sz_) {
assert(ts_sz != 0);
PutFixed32(dst, cf_id);
PutFixed16(dst, static_cast<uint16_t>(ts_sz));
}
}
inline Status DecodeFrom(Slice* src) {
const size_t total_size = src->size();
if ((total_size % kSizePerColumnFamily) != 0) {
std::ostringstream oss;
oss << "User-defined timestamp size record length: " << total_size
<< " is not a multiple of " << kSizePerColumnFamily << std::endl;
return Status::Corruption(oss.str());
}
int num_of_entries = static_cast<int>(total_size / kSizePerColumnFamily);
for (int i = 0; i < num_of_entries; i++) {
uint32_t cf_id = 0;
uint16_t ts_sz = 0;
if (!GetFixed32(src, &cf_id) || !GetFixed16(src, &ts_sz)) {
return Status::Corruption(
"Error decoding user-defined timestamp size record entry");
}
cf_to_ts_sz_.emplace_back(cf_id, static_cast<size_t>(ts_sz));
}
return Status::OK();
}
inline std::string DebugString() const {
std::ostringstream oss;
for (const auto& [cf_id, ts_sz] : cf_to_ts_sz_) {
oss << "Column family: " << cf_id
<< ", user-defined timestamp size: " << ts_sz << std::endl;
}
return oss.str();
}
private:
// 4 bytes for column family id, 2 bytes for user-defined timestamp size.
static constexpr size_t kSizePerColumnFamily = 4 + 2;
std::vector<std::pair<uint32_t, size_t>> cf_to_ts_sz_;
};
} // namespace ROCKSDB_NAMESPACE
Loading…
Cancel
Save